[Zope-Coders] Zope 2.7 branch

Fred L. Drake, Jr. fred@zope.com
Mon, 14 Jul 2003 13:47:00 -0400


--8bK25k7w9Y
Content-Type: text/plain; charset=us-ascii
Content-Description: message body and .signature
Content-Transfer-Encoding: 7bit


Last week, I wrote:
 > I'll take a look at what it takes to actually implement "closing" a
 > branch; I suspect it's possible in the typical case.

I then followed up, reporting that it wasn't doable, which left me
with a generally unsatisfied feeling.  (That might have been an
unfulfilled longing for Thai food, though.)

So I've looked into this some more, and determined... I was wrong!

It can be done.  I've written a script that does so (attached).  I
call it:  "branchctl" !  (Isn't it a wonder that anything can be
achieved with a *ctl script? ;)

For each branch (including the trunk), two controls are available:

- whether the branch is open for commits, and

- whether commits involving that branch and any other should be
  permitted.

Both default to true since that's the normal CVS behavior.

The configuration file should be present in the CSVROOT administrative
directory alongside the script.  Two keys are checked for each branch:

open
    Indicates whether checkins are allowed.  The value may be "true"
    or "false"; the default is "true".

cross-branch-commits
    Indicates whether a single commit may span multiple branches.  The
    implementation only checks this for a single directory at a time;
    commits that affect one branch in one directory and another branch
    in a separate directory are not (yet) detected.  The value may be
    "true" or "false"; the default is "true".

    (Bet you never even thought of anyone doing anything so twisted as
    this!)

An example configuration may look like this:

----------------------------------------
[general]
open = true
cross-branch-commits = false

[branch unmaintained-maintenance-branch]
; we don't like this branch any more
open = false
----------------------------------------

This configuration data can co-exist in the configuration file for the
new version of syncmail that's currently in development, so all your
branch-control configuration can be in one place if you're also using
syncmail (cvs.zope.org CVS is not).  The default configuration file
names are different for the two scripts; I'll suggest appropriate
usage would be to create a single file and name it explicitly using
the --config option for each script.

This script may provide a good foundation for additional forms of
access control for CVS repositories as well.


  -Fred

-- 
Fred L. Drake, Jr.  <fred at zope.com>
PythonLabs at Zope Corporation


--8bK25k7w9Y
Content-Type: text/plain
Content-Description: branch access control script
Content-Disposition: inline;
	filename="branchctl"
Content-Transfer-Encoding: 7bit

#! /usr/bin/env python
#
# Script to control whether CVS branches are open or closed for
# changes.
#
# This is intended to be called from the commitinfo hook.

"""\
Script which controls the use of branches within a CVS repository.

Usage:

    %(PROGRAM)s [options] [<%%S> [email-addr ...]]

Where options are:

    --config=file
        Use file as the configuration file.  By default, a file named
        %(DEFAULT_CONFIGURATION_FILE)s is used if it exists.  If the name of the
        configuration file is relative, it is interpreted as relative
        to the CVSROOT administrative directory of the repository.
        --config is incompatible with --no-config.

    --no-config
        Do not use a configuration file.  This can be used to disable
        the use of the default configuration file.  --no-config is
        incompatible with --config.

    --cvsroot=<path>
        Use <path> as the environment variable CVSROOT.  Otherwise this
        variable must exist in the environment.

    --quiet / -q
        Don't print as much status to stdout.

    --help / -h
        Print this text.
"""

# The configuration data can co-exist in the same configuration file
# with the configuration for the "new-world" version of syncmail.

import getopt
import os
import re
import socket
import string
import sys

try:
    from socket import getfqdn
except ImportError:
    def getfqdn():
        # Python 1.5.2 :(
        hostname = socket.gethostname()
        byaddr = socket.gethostbyaddr(socket.gethostbyname(hostname))
        aliases = byaddr[1]
        aliases.insert(0, byaddr[0])
        aliases.insert(0, hostname)
        for fqdn in aliases:
            if '.' in fqdn:
                break
        else:
            fqdn = 'localhost.localdomain'
        return fqdn


DEFAULT_CONFIGURATION_FILE = "branchctl.conf"

DEFAULT_CONFIGURATION = {
    "verbose": "true",
    "open": "true",
    "cross-branch-commits": "true",
    }



def usage(code, msg=''):
    print __doc__ % globals()
    if msg:
        print msg
    sys.exit(code)



class CVSEntry:
    def __init__(self, name, revision, timestamp, conflict, options, tagdate):
        self.name = name
        self.revision = revision
        self.timestamp = timestamp
        self.conflict = conflict
        self.options = options
        self.tagdate = tagdate

def get_entry(prefix, mapping, line, filename):
    line = string.strip(line)
    parts = string.split(line, "/")
    _, name, revision, timestamp, options, tagdate = parts
    tagdate = string.rstrip(tagdate) or None
    if tagdate:
        assert tagdate[0] == "T"
        tagdate = tagdate[1:]
    key = namekey(prefix, name)
    try:
        entry = mapping[key]
    except KeyError:
        if revision == "0":
            revision = None
        if "+" in timestamp:
            timestamp, conflict = tuple(string.split(timestamp, "+"))
        else:
            conflict = None
        entry = CVSEntry(key, revision, timestamp, conflict,
                         options, tagdate)
        mapping[key] = entry
    return entry

def namekey(prefix, name):
    if prefix:
        return os.path.join(prefix, name)
    else:
        return name

def load_entries(prefix=None):
    if prefix is not None:
        entries_fn = os.path.join(prefix, "CVS", "Entries")
    else:
        entries_fn = os.path.join("CVS", "Entries")
    mapping = {}
    f = open(entries_fn)
    while 1:
        line = f.readline()
        if not line:
            break
##        if string.strip(line) == "D":
##            continue
        # we could recurse down subdirs, except the Entries.Log files
        # we need haven't been written to the subdirs yet, so it
        # doesn't do us any good
##        if line[0] == "D":
##            name = string.split(line, "/")[1]
##            dirname = namekey(prefix, name)
##            if os.path.isdir(dirname):
##                m = load_change_info(dirname)
##                mapping.update(m)
        if line[0] == "/":
            # normal file
            get_entry(prefix, mapping, line, entries_fn)
        # else: bogus Entries line
    f.close()
    return mapping

def load_change_info(mapping, prefix=None):
    if prefix is not None:
        entries_log_fn = os.path.join(prefix, "CVS", "Entries.Log")
    else:
        entries_log_fn = os.path.join("CVS", "Entries.Log")
    if os.path.isfile(entries_log_fn):
        f = open(entries_log_fn)
        while 1:
            line = f.readline()
            if not line:
                break
            if line[1:2] != ' ':
                # really old version of CVS
                break
            entry = get_entry(prefix, mapping, line[2:], entries_log_fn)
            parts = string.split(line, "/")[1:]
            if line[0] == "A":
                # adding a file
                entry.new_revision = parts[1]
            elif line[0] == "R":
                # removing a file
                entry.new_revision = None
        f.close()
    for entry in mapping.values():
        if not hasattr(entry, "new_revision"):
            print 'confused about file', entry.name, '-- ignoring'
            del mapping[entry.name]
    return mapping

def load_branch_name():
    tag_fn = os.path.join("CVS", "Tag")
    if os.path.isfile(tag_fn):
        f = open(tag_fn)
        line = string.strip(f.readline())
        f.close()
        if line[:1] == "T":
            return line[1:]
    return None


TRUE_VALUES = ('true', 'on', 'enabled')
FALSE_VALUES = ('false', 'off', 'disabled')

class OptionLookup:
    def __init__(self, dicts, branch=None):
        self._dicts = dicts
        self._replace = Replacer({
            "BRANCH": branch or "",
            "CVSROOT": os.environ.get("CVSROOT", ""),
            "HOSTNAME": os.environ.get("HOSTNAME") or getfqdn(),
            })

    def get(self, option):
        for dict in self._dicts:
            v = dict.get(option)
            if v is not None:
                return self._replace(v)
        return None

    def getbool(self, option):
        v = self.get(option)
        if v is not None:
            v = string.lower(v)
            if v in TRUE_VALUES:
                v = 1
            elif v in FALSE_VALUES:
                v = 0
            else:
                raise ValueError("illegal boolean value: %s" % `v`)
        return v

    def getint(self, option):
        v = self.get(option)
        if v is not None:
            v = int(v)
        return v

    def getaddress(self, option):
        """Return (host, port) for a host:port or host string.

        The port, if ommitted, will be None.
        """
        v = self.get(option)
        if v is None:
            return MAILHOST, MAILPORT
        elif ":" in v:
            h, p = tuple(string.split(v, ":"))
            p = int(p)
            return h or MAILHOST, p
        else:
            return v, MAILPORT

# Support for $VARIABLE replacement.

class Replacer:
    def __init__(self, vars):
        self._vars = vars
        rx = re.compile(r"\$([a-zA-Z][a-zA-Z_]*\b|\{[a-zA-Z][a-zA-Z_]*\})")
        self._search = rx.search

    def __call__(self, v):
        v, name, suffix = self._split(v)
        while name:
            v = v + self._vars.get(name, "")
            prefix, name, suffix = self._split(suffix)
            v = v + prefix
        return v

    def _split(self, s):
        m = self._search(s)
        if m is not None:
            name = m.group(1)
            if name[0] == "{":
                name = name[1:-1]
            return s[:m.start()], name, s[m.end():]
        else:
            return s, None, ''

def get_section_as_dict(config, section):
    d = {}
    if config.has_section(section):
        for opt in config.options(section):
            d[opt] = config.get(section, opt, raw=1)
    return d

from ConfigParser import ConfigParser
class ConfigParser(ConfigParser):

    # Regular expressions for parsing section headers and options,
    # from the Python 2.3 version of ConfigParser.

    SECTCRE = re.compile(
        r'\['                        # [
        r'(?P<header>[^]]+)'         # very permissive!
        r'\]'                        # ]
        )

    # For compatibility with older versions:
    __SECTCRE = SECTCRE


class ConfigurationDatabase:

    def __init__(self, filename, cmdline, args):
        self.args = args
        self.cmdline = cmdline
        self.config = None
        if filename and os.path.isfile(filename):
            self.config = ConfigParser()
            if hasattr(self.config, "readfp"):
                self.config.readfp(open(filename), filename)
            else:
                # We have to use this old method for compatibility with
                # ancient versions of Python.
                self.config.read([filename])

    def get_config(self, branch):
        dicts = []
        if self.config:
            cp = self.config
            if branch:
                dicts.append(get_section_as_dict(cp, "branch " + branch))
                dicts.append(get_section_as_dict(cp, "branch"))
            dicts.append(self.cmdline)
            dicts.append(get_section_as_dict(cp, "general"))
        else:
            dicts.append(self.cmdline)
        dicts = filter(None, dicts)
        dicts.append(DEFAULT_CONFIGURATION)
        return Options(OptionLookup(dicts, branch), self.args)


class OptionsBase:

    def __init__(self, config, args):
        self.args = args
        self.verbose = config.getbool('verbose')
        fn = os.path.join("CVS", "Repository")
        if os.path.isfile(fn):
            f = open(fn)
            self.repodir = string.strip(f.readline())
            f.close()
        else:
            self.repodir = os.curdir


def get_admin_file(name):
    if os.environ.has_key("CVSROOT"):
        p = os.path.join(os.environ["CVSROOT"], "CVSROOT", name)
        return os.path.abspath(p)
    else:
        return os.path.join("CVSROOT", name)

def load_configuration(args):
    cmdline, args = load_cmdline(args)
    if cmdline.has_key('config-file'):
        cfgfile = get_admin_file(cmdline['config-file'])
        del cmdline['config-file']
    else:
        defconfig = get_admin_file(DEFAULT_CONFIGURATION_FILE)
        if os.path.isfile(defconfig):
            cfgfile = defconfig
        else:
            cfgfile = None
    return ConfigurationDatabase(cfgfile, cmdline, args)



class Options(OptionsBase):

    def __init__(self, config, args):
        OptionsBase.__init__(self, config, args)

        self.open = config.getbool("open")
        self.cross_commits = config.getbool("cross-branch-commits")

def load_cmdline(args):
    try:
        opts, args = getopt.getopt(args, 'hq',
                                   ['cvsroot=', 'config=', 'no-config',
                                    'help', 'quiet'])
    except getopt.error, msg:
        usage(2, msg)
    cmdline = {}
    def set(option, value, cmdline=cmdline):
        if cmdline.has_key(option):
            usage(2, "can't set option more than once")
        cmdline[option] = value
    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage(0)
        elif opt == '--cvsroot':
            os.environ["CVSROOT"] = arg
        elif opt == "--config":
            set('config-file', arg)
        elif opt == '--no-config':
            set('config-file', '')
        elif opt in ('-q', '--quiet'):
            set('verbose', 'false')
    return cmdline, args


def main():
    branch = load_branch_name()
    configdb = load_configuration(sys.argv[1:])
    changes = load_entries().values()

    d = {}
    for change in changes:
        tag = change.tagdate or branch
        if d.has_key(tag):
            d[tag].append(change)
        else:
            d[tag] = [change]
    denied = 0
    crossed_branches = len(d) > 1
    for tag in d.keys():
        config = configdb.get_config(tag)
        if tag:
            branchname = "branch " + tag
        else:
            branchname = "trunk"
        if not config.open:
            if not denied:
                denied = 1
                print
            print branchname, "is closed"
        if crossed_branches and not config.cross_commits:
            if not denied:
                denied = 1
                print
            print branchname, "does not permit cross-branch commits"
    if denied:
        print
        sys.exit(1)


if __name__ == '__main__':
    main()

--8bK25k7w9Y--