[Zope-Coders] Zope 2.7 branch
Fred L. Drake, Jr.
fred@zope.com
Mon, 14 Jul 2003 13:47:00 -0400
--8bK25k7w9Y
Content-Type: text/plain; charset=us-ascii
Content-Description: message body and .signature
Content-Transfer-Encoding: 7bit
Last week, I wrote:
> I'll take a look at what it takes to actually implement "closing" a
> branch; I suspect it's possible in the typical case.
I then followed up, reporting that it wasn't doable, which left me
with a generally unsatisfied feeling. (That might have been an
unfulfilled longing for Thai food, though.)
So I've looked into this some more, and determined... I was wrong!
It can be done. I've written a script that does so (attached). I
call it: "branchctl" ! (Isn't it a wonder that anything can be
achieved with a *ctl script? ;)
For each branch (including the trunk), two controls are available:
- whether the branch is open for commits, and
- whether commits involving that branch and any other should be
permitted.
Both default to true since that's the normal CVS behavior.
The configuration file should be present in the CSVROOT administrative
directory alongside the script. Two keys are checked for each branch:
open
Indicates whether checkins are allowed. The value may be "true"
or "false"; the default is "true".
cross-branch-commits
Indicates whether a single commit may span multiple branches. The
implementation only checks this for a single directory at a time;
commits that affect one branch in one directory and another branch
in a separate directory are not (yet) detected. The value may be
"true" or "false"; the default is "true".
(Bet you never even thought of anyone doing anything so twisted as
this!)
An example configuration may look like this:
----------------------------------------
[general]
open = true
cross-branch-commits = false
[branch unmaintained-maintenance-branch]
; we don't like this branch any more
open = false
----------------------------------------
This configuration data can co-exist in the configuration file for the
new version of syncmail that's currently in development, so all your
branch-control configuration can be in one place if you're also using
syncmail (cvs.zope.org CVS is not). The default configuration file
names are different for the two scripts; I'll suggest appropriate
usage would be to create a single file and name it explicitly using
the --config option for each script.
This script may provide a good foundation for additional forms of
access control for CVS repositories as well.
-Fred
--
Fred L. Drake, Jr. <fred at zope.com>
PythonLabs at Zope Corporation
--8bK25k7w9Y
Content-Type: text/plain
Content-Description: branch access control script
Content-Disposition: inline;
filename="branchctl"
Content-Transfer-Encoding: 7bit
#! /usr/bin/env python
#
# Script to control whether CVS branches are open or closed for
# changes.
#
# This is intended to be called from the commitinfo hook.
"""\
Script which controls the use of branches within a CVS repository.
Usage:
%(PROGRAM)s [options] [<%%S> [email-addr ...]]
Where options are:
--config=file
Use file as the configuration file. By default, a file named
%(DEFAULT_CONFIGURATION_FILE)s is used if it exists. If the name of the
configuration file is relative, it is interpreted as relative
to the CVSROOT administrative directory of the repository.
--config is incompatible with --no-config.
--no-config
Do not use a configuration file. This can be used to disable
the use of the default configuration file. --no-config is
incompatible with --config.
--cvsroot=<path>
Use <path> as the environment variable CVSROOT. Otherwise this
variable must exist in the environment.
--quiet / -q
Don't print as much status to stdout.
--help / -h
Print this text.
"""
# The configuration data can co-exist in the same configuration file
# with the configuration for the "new-world" version of syncmail.
import getopt
import os
import re
import socket
import string
import sys
try:
from socket import getfqdn
except ImportError:
def getfqdn():
# Python 1.5.2 :(
hostname = socket.gethostname()
byaddr = socket.gethostbyaddr(socket.gethostbyname(hostname))
aliases = byaddr[1]
aliases.insert(0, byaddr[0])
aliases.insert(0, hostname)
for fqdn in aliases:
if '.' in fqdn:
break
else:
fqdn = 'localhost.localdomain'
return fqdn
DEFAULT_CONFIGURATION_FILE = "branchctl.conf"
DEFAULT_CONFIGURATION = {
"verbose": "true",
"open": "true",
"cross-branch-commits": "true",
}
def usage(code, msg=''):
print __doc__ % globals()
if msg:
print msg
sys.exit(code)
class CVSEntry:
def __init__(self, name, revision, timestamp, conflict, options, tagdate):
self.name = name
self.revision = revision
self.timestamp = timestamp
self.conflict = conflict
self.options = options
self.tagdate = tagdate
def get_entry(prefix, mapping, line, filename):
line = string.strip(line)
parts = string.split(line, "/")
_, name, revision, timestamp, options, tagdate = parts
tagdate = string.rstrip(tagdate) or None
if tagdate:
assert tagdate[0] == "T"
tagdate = tagdate[1:]
key = namekey(prefix, name)
try:
entry = mapping[key]
except KeyError:
if revision == "0":
revision = None
if "+" in timestamp:
timestamp, conflict = tuple(string.split(timestamp, "+"))
else:
conflict = None
entry = CVSEntry(key, revision, timestamp, conflict,
options, tagdate)
mapping[key] = entry
return entry
def namekey(prefix, name):
if prefix:
return os.path.join(prefix, name)
else:
return name
def load_entries(prefix=None):
if prefix is not None:
entries_fn = os.path.join(prefix, "CVS", "Entries")
else:
entries_fn = os.path.join("CVS", "Entries")
mapping = {}
f = open(entries_fn)
while 1:
line = f.readline()
if not line:
break
## if string.strip(line) == "D":
## continue
# we could recurse down subdirs, except the Entries.Log files
# we need haven't been written to the subdirs yet, so it
# doesn't do us any good
## if line[0] == "D":
## name = string.split(line, "/")[1]
## dirname = namekey(prefix, name)
## if os.path.isdir(dirname):
## m = load_change_info(dirname)
## mapping.update(m)
if line[0] == "/":
# normal file
get_entry(prefix, mapping, line, entries_fn)
# else: bogus Entries line
f.close()
return mapping
def load_change_info(mapping, prefix=None):
if prefix is not None:
entries_log_fn = os.path.join(prefix, "CVS", "Entries.Log")
else:
entries_log_fn = os.path.join("CVS", "Entries.Log")
if os.path.isfile(entries_log_fn):
f = open(entries_log_fn)
while 1:
line = f.readline()
if not line:
break
if line[1:2] != ' ':
# really old version of CVS
break
entry = get_entry(prefix, mapping, line[2:], entries_log_fn)
parts = string.split(line, "/")[1:]
if line[0] == "A":
# adding a file
entry.new_revision = parts[1]
elif line[0] == "R":
# removing a file
entry.new_revision = None
f.close()
for entry in mapping.values():
if not hasattr(entry, "new_revision"):
print 'confused about file', entry.name, '-- ignoring'
del mapping[entry.name]
return mapping
def load_branch_name():
tag_fn = os.path.join("CVS", "Tag")
if os.path.isfile(tag_fn):
f = open(tag_fn)
line = string.strip(f.readline())
f.close()
if line[:1] == "T":
return line[1:]
return None
TRUE_VALUES = ('true', 'on', 'enabled')
FALSE_VALUES = ('false', 'off', 'disabled')
class OptionLookup:
def __init__(self, dicts, branch=None):
self._dicts = dicts
self._replace = Replacer({
"BRANCH": branch or "",
"CVSROOT": os.environ.get("CVSROOT", ""),
"HOSTNAME": os.environ.get("HOSTNAME") or getfqdn(),
})
def get(self, option):
for dict in self._dicts:
v = dict.get(option)
if v is not None:
return self._replace(v)
return None
def getbool(self, option):
v = self.get(option)
if v is not None:
v = string.lower(v)
if v in TRUE_VALUES:
v = 1
elif v in FALSE_VALUES:
v = 0
else:
raise ValueError("illegal boolean value: %s" % `v`)
return v
def getint(self, option):
v = self.get(option)
if v is not None:
v = int(v)
return v
def getaddress(self, option):
"""Return (host, port) for a host:port or host string.
The port, if ommitted, will be None.
"""
v = self.get(option)
if v is None:
return MAILHOST, MAILPORT
elif ":" in v:
h, p = tuple(string.split(v, ":"))
p = int(p)
return h or MAILHOST, p
else:
return v, MAILPORT
# Support for $VARIABLE replacement.
class Replacer:
def __init__(self, vars):
self._vars = vars
rx = re.compile(r"\$([a-zA-Z][a-zA-Z_]*\b|\{[a-zA-Z][a-zA-Z_]*\})")
self._search = rx.search
def __call__(self, v):
v, name, suffix = self._split(v)
while name:
v = v + self._vars.get(name, "")
prefix, name, suffix = self._split(suffix)
v = v + prefix
return v
def _split(self, s):
m = self._search(s)
if m is not None:
name = m.group(1)
if name[0] == "{":
name = name[1:-1]
return s[:m.start()], name, s[m.end():]
else:
return s, None, ''
def get_section_as_dict(config, section):
d = {}
if config.has_section(section):
for opt in config.options(section):
d[opt] = config.get(section, opt, raw=1)
return d
from ConfigParser import ConfigParser
class ConfigParser(ConfigParser):
# Regular expressions for parsing section headers and options,
# from the Python 2.3 version of ConfigParser.
SECTCRE = re.compile(
r'\[' # [
r'(?P<header>[^]]+)' # very permissive!
r'\]' # ]
)
# For compatibility with older versions:
__SECTCRE = SECTCRE
class ConfigurationDatabase:
def __init__(self, filename, cmdline, args):
self.args = args
self.cmdline = cmdline
self.config = None
if filename and os.path.isfile(filename):
self.config = ConfigParser()
if hasattr(self.config, "readfp"):
self.config.readfp(open(filename), filename)
else:
# We have to use this old method for compatibility with
# ancient versions of Python.
self.config.read([filename])
def get_config(self, branch):
dicts = []
if self.config:
cp = self.config
if branch:
dicts.append(get_section_as_dict(cp, "branch " + branch))
dicts.append(get_section_as_dict(cp, "branch"))
dicts.append(self.cmdline)
dicts.append(get_section_as_dict(cp, "general"))
else:
dicts.append(self.cmdline)
dicts = filter(None, dicts)
dicts.append(DEFAULT_CONFIGURATION)
return Options(OptionLookup(dicts, branch), self.args)
class OptionsBase:
def __init__(self, config, args):
self.args = args
self.verbose = config.getbool('verbose')
fn = os.path.join("CVS", "Repository")
if os.path.isfile(fn):
f = open(fn)
self.repodir = string.strip(f.readline())
f.close()
else:
self.repodir = os.curdir
def get_admin_file(name):
if os.environ.has_key("CVSROOT"):
p = os.path.join(os.environ["CVSROOT"], "CVSROOT", name)
return os.path.abspath(p)
else:
return os.path.join("CVSROOT", name)
def load_configuration(args):
cmdline, args = load_cmdline(args)
if cmdline.has_key('config-file'):
cfgfile = get_admin_file(cmdline['config-file'])
del cmdline['config-file']
else:
defconfig = get_admin_file(DEFAULT_CONFIGURATION_FILE)
if os.path.isfile(defconfig):
cfgfile = defconfig
else:
cfgfile = None
return ConfigurationDatabase(cfgfile, cmdline, args)
class Options(OptionsBase):
def __init__(self, config, args):
OptionsBase.__init__(self, config, args)
self.open = config.getbool("open")
self.cross_commits = config.getbool("cross-branch-commits")
def load_cmdline(args):
try:
opts, args = getopt.getopt(args, 'hq',
['cvsroot=', 'config=', 'no-config',
'help', 'quiet'])
except getopt.error, msg:
usage(2, msg)
cmdline = {}
def set(option, value, cmdline=cmdline):
if cmdline.has_key(option):
usage(2, "can't set option more than once")
cmdline[option] = value
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt == '--cvsroot':
os.environ["CVSROOT"] = arg
elif opt == "--config":
set('config-file', arg)
elif opt == '--no-config':
set('config-file', '')
elif opt in ('-q', '--quiet'):
set('verbose', 'false')
return cmdline, args
def main():
branch = load_branch_name()
configdb = load_configuration(sys.argv[1:])
changes = load_entries().values()
d = {}
for change in changes:
tag = change.tagdate or branch
if d.has_key(tag):
d[tag].append(change)
else:
d[tag] = [change]
denied = 0
crossed_branches = len(d) > 1
for tag in d.keys():
config = configdb.get_config(tag)
if tag:
branchname = "branch " + tag
else:
branchname = "trunk"
if not config.open:
if not denied:
denied = 1
print
print branchname, "is closed"
if crossed_branches and not config.cross_commits:
if not denied:
denied = 1
print
print branchname, "does not permit cross-branch commits"
if denied:
print
sys.exit(1)
if __name__ == '__main__':
main()
--8bK25k7w9Y--