[Zope-Checkins] CVS: Packages/ZConfig - DataTypes.py:1.1.2.1 SchemaInfo.py:1.1.2.1 SchemaParser.py:1.1.4.1

Fred L. Drake, Jr. fred@zope.com
Fri, 6 Dec 2002 18:54:04 -0500


Update of /cvs-repository/Packages/ZConfig
In directory cvs.zope.org:/tmp/cvs-serv26703

Added Files:
      Tag: zconfig-schema-devel-branch
	DataTypes.py SchemaInfo.py SchemaParser.py 
Log Message:
Checkpoint: not working, needs tests.

=== Added File Packages/ZConfig/DataTypes.py ===
"""Selection of standard datatypes for ZConfig."""

import re


class TrivialConversion:
    """Datatype that exposes a conversion implemented as a function."""

    def __init__(self, conversion):
        self.convert = conversion


class MemoizedConversion:
    """Conversion helper that caches the results of expensive conversions."""

    def __init__(self, conversion):
        self._memo = {}
        self._conversion = conversion

    def convert(self, value):
        try:
            return self._memo[value]
        except KeyError:
            v = self._conversion(value)
            self._memo[value] = v
            return v


class RangeCheckedConversion:
    """Conversion helper that range checks another conversion."""

    def __init__(self, conversion, min=None, max=None):
        self._min = min
        self._max = max
        self._conversion = conversion

    def convert(self, value):
        v = self._conversion(value)
        if self._min is not None and v < self._min:
            raise ValueError("%s is below lower bound (%s)"
                             % (`v`, `self._min`))
        if self._max is not None and v > self._max:
            raise ValueError("%s is above upper bound (%s)"
                             % (`v`, `self._max`))
        return v


class RegularExpressionConversion:
    def __init__(self, regex):
        self._rx = re.compile(regex)

    def convert(self, value):
        m = self._rx.match(value)
        if m and m.group() == value:
            return value
        else:
            raise ValueError("value did not match regular expression: "
                             + `value`)


class Locale:
    def convert(self, value):
        locale = self._get_locale_module()
        # get current setting of locale
        prev = locale.setlocale(locale.LC_ALL)
        try:
            try:
                locale.setlocale(locale.LC_ALL, value)
            finally:
                locale.setlocale(locale.LC_ALL, prev)
        except locale.Error:
            raise ValueError(
                'The specified locale "%s" is not supported by your system.\n'
                'See your operating system documentation for more\n'
                'information on locale support.' % value
                )
        return value

    def _get_locale_module(self):
        try:
            return self._locale
        except AttributeError:
            try:
                import locale
            except ImportError:
                raise ValueError(
                    'The locale module could not be imported.\n'
                    'To use localization options, you must ensure\n'
                    'that the locale module is compiled into your\n'
                    'Python installation.'
                    )
            Locale._locale = locale
            return locale


class BasicKeyConversion(RegularExpressionConversion):
    def __init__(self):
        RegularExpressionConversion.__init__(self, "[a-zA-Z][-.a-zA-Z0-9]")

    def convert(self, value):
        return RegularExpressionConversion.convert(self, value).lower()


stock_datatypes = {
    "int":         TrivialConversion(int),
    "float":       TrivialConversion(float),
    "str":         TrivialConversion(str),
    "locale":      MemoizedConversion(Locale().convert),
    "port-number": RangeCheckedConversion(int, min=1, max=0xffff),
    "basic-key":   BasicKeyConversion(),
    }

class Registry:
    def __init__(self):
        self._stock = stock_datatypes.copy()
        self._other = {}

    def get(self, name):
        if name is None:
            name = "str"
        t = self._stock.get(name)
        if t is None:
            t = self._other.get(name)
            if t is None:
                t = self.search(name)
        return t

    def register(self, name, datatype):
        if self._stock.has_key(name):
            raise ValueError("datatype name conflicts with built-in types: "
                             + `name`)
        if self._other.has_key(name):
            raise ValueError("datatype name already registered:" + `name`)
        self._other[name] = datatype

    def search(self, name):
        if not "." in name:
            raise ValueError("unloadable datatype name: " + `name`)
        components = name.split('.')
        start = components[0]
        g = globals()
        package = __import__(start, g, g)
        modulenames = [start]
        for component in components[1:]:
            modulenames.append(component)
            try:
                package = getattr(package, component)
            except AttributeError:
                n = '.'.join(modulenames)
                package = __import__(n, g, g, component)
        datatype = package()
        self._other[name] = datatype
        return datatype


_r = Registry()
get = _r.get
register = _r.register


=== Added File Packages/ZConfig/SchemaInfo.py ===
"""Objects that can describe a ZConfig schema."""


class KeyInfo:
    """Information about a single configuration key."""

    def __init__(self, name, datatype, minOccurs, maxOccurs, handler):
        if maxOccurs < 1:
            raise ValueError("maxOccurs must be at least 1")
        if minOccurs < maxOccurs:
            raise ValueError("minOccurs must be at least maxOccurs")
        self.name = name
        self.datatype = datatype
        self.minOccurs = minOccurs
        self.maxOccurs = maxOccurs
        self.handler = handler
        self._default = None
        self._converted = False
        self._finished = False

    def finish(self):
        if self._finished:
            raise ValueError("cannot finish KeyInfo more than once")
        self._finished = True

    def adddefault(self, value):
        if self._finished:
            raise ValueError("cannot add default values to finished KeyInfo")
        if self.maxOccurs > 1:
            if self._default is None:
                self._default = [value]
            else:
                self._default.append(value)
        elif self._default is not None:
            raise ValueError("cannot set more than one default to key with"
                             " maxOccurs == 1")
        else:
            self._default = value

    def getdefault(self):
        if not self._finished:
            raise ValueError("cannot get default value of key before KeyInfo"
                             " has been completely initialized")
        if self._default is None and self.maxOccurs > 1:
            self._default = []
            self._converted = True
        if not self._converted:
            if self.maxOccurs > 1:
                self._default = map(self.datatype.convert, self._default)
            elif self._default is not None:
                self._default = self.datatype.convert(self._default)
            self._converted = True
        return self._default

    def ismulti(self):
        return self.maxOccurs > 1

    def isoptional(self):
        return self.minOccurs == 0

    def issection(self):
        return False


class SectionInfo(KeyInfo):
    def __init__(self, name, datatype, minOccurs=0, maxOccurs=1,
                 handler=None, keytype=None, names=None):
        assert keytype is not None
        self.keytype = keytype
        if names is None:
            names = "*",
        self.names = names
        self._children = []
        KeyInfo.__init__(self, name, datatype, minOccurs, maxOccurs,
                         handler)

    def _add_child(self, thing):
        for c in self._children:
            if name == c.name:
                raise ValueError("child name %s already used" % name)
        self._children.append(thing)

    def addkey(self, keyinfo):
        self._add_child(keyinfo.name, keyinfo)

    def getchildnames(self):
        return [c.name for c in self._children]

    def issection(self):
        return True

    def allowsUnnamed(self):
        return "*" in self.names

    def isAllowedName(self, name):
        if name == "*" or name == "+":
            return False
        elif "+" in self.names:
            return bool(name)
        else:
            return name in self.names


=== Added File Packages/ZConfig/SchemaParser.py ===
"""Parser for ZConfig schemas."""

import types
import xml.sax

import ZConfig
import ZConfig.DataTypes

from ZConfig.SchemaInfo import SectionInfo, KeyInfo


default_value_type = ZConfig.DataTypes.get("str")
default_key_type = ZConfig.DataTypes.get("basic-key")
default_name_type = default_key_type


class SchemaParser(xml.sax.ContentHandler):

    _cdata_tags = "description", "metadefault", "example", "default"
    _handled_tags = "schema", "key", "section", "sectiongroup"

    def __init__(self):
        self._cdata = None
        self._locator = None
        self._prefixes = []

    # SAX 2 ContentHandler methods

    def setDocumentLocator(self, locator):
        self._locator = locator

    def startElement(self, name, attrs):
        attrs = dict(attrs)
        if name == "schema":
            if self._schema:
                self.doSchemaError("schema element improperly nested")
            self.start_schema(attrs)
        elif name in self._handled_tags:
            if not self._schema:
                self.doSchemaError(name + " element outside of schema")
            getattr(self, "start_" + name)(attrs)
        elif name in self._cdata_tags:
            if not self._schema:
                self.doSchemaError(name + " element outside of schema")
            if self._cdata is not None:
                self.doSchemaError(name + " element improperly nested")
            self._cdata = []
        else:
            self.doSchemaError("Unknown tag " + name)

    def characters(self, data):
        if self._cdata is not None:
            self._cdata.append(data)
        elif data.strip():
            self.doSchemaError("unexpected non-blank character data: "
                               + data.strip())

    def endElement(self, name):
        if name in self._handled_tags:
            getattr(self, "end_" + name)()
        else:
            data = ''.join(self._cdata).strip()
            self._cdata = None
            if name == "default":
                # value for a key
                self._stack[-1].adddefault(data)
            else:
                setattr(self._stack[-1], name, data)

    def endDocument(self):
        if not self._schema:
            self.doSchemaError("no schema found")
        self.checkClasses()
        self.context.setSchema(self._schema)

    # schema loading logic

    def get_classname(self, name):
        if name.startswith(".") and self._prefixes:
            return self._prefixes[-1] + name
        else:
            return name

    def push_prefix(self, attrs):
        name = attrs.get("prefix")
        self._prefixes.append(self.get_classname(name) or "")

    def start_schema(self, attrs):
        self.start_section(attrs)
        assert len(self._stack) == 1
        self._schema = self._stack[0]

    def end_schema(self):
        del self._prefixes[-1]
        assert not self._prefixes
        del self._sections[-1]
        assert not self._sections

    def start_section(self, attrs):
        self.push_prefix(attrs)
        if attrs.has_key("keytype"):
            keytype = ZConfig.DataTypes.get(attrs["keytype"])
        else:
            keytype = default_key_type
        if attrs.has_key("nametype"):
            nametype = ZConfig.DataTypes.get(attrs["nametype"])
        else:
            nametype = default_name_type
        if attrs.has_key("names"):
            names = []
            for s in attrs["names"].split("|"):
                s = s.strip()
                if s != "*" and s != "+":
                    s = nametype.convert(s)
                    if s == "*" or s == "+":
                        self.doSchemaError("nametypes may not convert to"
                                           " '*' or '+'")
                names.append(s)
        else:
            names = None
        maxOccurs, minOccurs, handler = self.get_common_info(attrs)
        

    def end_section(self):
        del self._prefixes[-1]
        self._stack[-1].finish()

    def start_sectiongroup(self, attrs):
        self.push_prefix(attrs)

    def end_sectiongroup(self):
        del self._prefixes[-1]
        self._stack.pop().finish()

    def start_key(self, attrs):
        name = attrs.get("name")
        if not name:
            self.doSchemaError("key name may not be omitted or empty")
        # run the keytype converter to make sure this is a valid key
        name = self._stack[-1].keytype.convert(name)
        datatype = ZConfig.DataTypes.get(attrs.get("type"))
        maxOccurs, minOccurs, handler = self.get_common_info(attrs)
        key = KeyInfo(name, datatype, minOccurs, maxOccurs, handler)
        self._sections[-1].addkey(key)
        self._stack.append(key)

    def end_key(self):
        self._stack.pop().finish()

    def get_common_info(self, attrs):
        maxOccurs = attrs.get("maxOccurs")
        if maxOccurs:
            maxOccurs = int(maxOccurs)
        minOccurs = attrs.get("minOccurs")
        if minOccurs:
            minOccurs = int(minOccurs)
        handler = attrs.get("handler")
        if handler:
            handler = self.get_classname(handler)
        return maxOccurs, minOccurs, handler