[Zope-CVS] CVS: Products/Basket - pkg_resources.py:1.1
Chris McDonough
chrism at plope.com
Wed Nov 9 14:42:13 EST 2005
Update of /cvs-repository/Products/Basket
In directory cvs.zope.org:/tmp/cvs-serv21914
Added Files:
pkg_resources.py
Log Message:
Add pkg_resources.py (2.3 version).
=== Added File Products/Basket/pkg_resources.py === (1814/2214 lines abridged)
"""Package resource API
--------------------
A resource is a logical file contained within a package, or a logical
subdirectory thereof. The package resource API expects resource names
to have their path parts separated with ``/``, *not* whatever the local
path separator is. Do not use os.path operations to manipulate resource
names being passed into the API.
The package resource API is designed to work with normal filesystem packages,
.egg files, and unpacked .egg files. It can also work in a limited way with
.zip files and with custom PEP 302 loaders that support the ``get_data()``
method.
"""
import sys, os, zipimport, time, re, imp, new
from sets import ImmutableSet
__all__ = [
# Basic resource access and distribution/entry point discovery
'require', 'run_script', 'get_provider', 'get_distribution',
'load_entry_point', 'get_entry_map', 'get_entry_info', 'iter_entry_points',
'resource_string', 'resource_stream', 'resource_filename',
'resource_listdir', 'resource_exists', 'resource_isdir',
# Environmental control
'declare_namespace', 'working_set', 'add_activation_listener',
'find_distributions', 'set_extraction_path', 'cleanup_resources',
'get_default_cache',
# Primary implementation classes
'Environment', 'WorkingSet', 'ResourceManager',
'Distribution', 'Requirement', 'EntryPoint',
# Exceptions
'ResolutionError','VersionConflict','DistributionNotFound','UnknownExtra',
# Parsing functions and string utilities
'parse_requirements', 'parse_version', 'safe_name', 'safe_version',
'get_platform', 'compatible_platforms', 'yield_lines', 'split_sections',
'safe_extra',
# filesystem utilities
'ensure_directory', 'normalize_path',
# Distribution "precedence" constants
'EGG_DIST', 'BINARY_DIST', 'SOURCE_DIST', 'CHECKOUT_DIST',
# "Provider" interfaces, implementations, and registration/lookup APIs
'IMetadataProvider', 'IResourceProvider',
'PathMetadata', 'EggMetadata', 'EmptyProvider', 'empty_provider',
'NullProvider', 'EggProvider', 'DefaultProvider', 'ZipProvider',
'register_finder', 'register_namespace_handler', 'register_loader_type',
'fixup_namespace_packages', 'get_importer',
# Deprecated/backward compatibility only
'run_main', 'AvailableDistributions',
]
class ResolutionError(Exception):
"""Abstract base for dependency resolution errors"""
class VersionConflict(ResolutionError):
"""An already-installed version conflicts with the requested version"""
class DistributionNotFound(ResolutionError):
"""A requested distribution was not found"""
class UnknownExtra(ResolutionError):
"""Distribution doesn't have an "extra feature" of the given name"""
_provider_factories = {}
PY_MAJOR = sys.version[:3]
EGG_DIST = 3
BINARY_DIST = 2
SOURCE_DIST = 1
CHECKOUT_DIST = 0
def register_loader_type(loader_type, provider_factory):
"""Register `provider_factory` to make providers for `loader_type`
`loader_type` is the type or class of a PEP 302 ``module.__loader__``,
and `provider_factory` is a function that, passed a *module* object,
returns an ``IResourceProvider`` for that module.
"""
_provider_factories[loader_type] = provider_factory
def get_provider(moduleOrReq):
"""Return an IResourceProvider for the named module or requirement"""
if isinstance(moduleOrReq,Requirement):
return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
try:
module = sys.modules[moduleOrReq]
except KeyError:
__import__(moduleOrReq)
module = sys.modules[moduleOrReq]
loader = getattr(module, '__loader__', None)
return _find_adapter(_provider_factories, loader)(module)
def _macosx_vers(_cache=[]):
if not _cache:
info = os.popen('/usr/bin/sw_vers').read().splitlines()
for line in info:
key, value = line.split(None, 1)
if key == 'ProductVersion:':
_cache.append(value.strip().split("."))
break
else:
raise ValueError, "What?!"
return _cache[0]
def _macosx_arch(machine):
return {'PowerPC':'ppc', 'Power_Macintosh':'ppc'}.get(machine,machine)
def get_platform():
"""Return this platform's string for platform-specific distributions
XXX Currently this is the same as ``distutils.util.get_platform()``, but it
needs some hacks for Linux and Mac OS X.
"""
if sys.platform == "darwin":
try:
version = _macosx_vers()
machine = os.uname()[4].replace(" ", "_")
return "macosx-%d.%d-%s" % (int(version[0]), int(version[1]),
_macosx_arch(machine))
except ValueError:
# if someone is running a non-Mac darwin system, this will fall
# through to the default implementation
pass
from distutils.util import get_platform
return get_platform()
macosVersionString = re.compile(r"macosx-(\d+)\.(\d+)-(.*)")
darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)")
def compatible_platforms(provided,required):
"""Can code for the `provided` platform run on the `required` platform?
Returns true if either platform is ``None``, or the platforms are equal.
XXX Needs compatibility checks for Linux and Mac OS X.
"""
if provided is None or required is None or provided==required:
return True # easy case
# Mac OS X special cases
reqMac = macosVersionString.match(required)
if reqMac:
provMac = macosVersionString.match(provided)
# is this a Mac package?
if not provMac:
# this is backwards compatibility for packages built before
# setuptools 0.6. All packages built after this point will
# use the new macosx designation.
provDarwin = darwinVersionString.match(provided)
if provDarwin:
dversion = int(provDarwin.group(1))
macosversion = "%s.%s" % (reqMac.group(1), reqMac.group(2))
if dversion == 7 and macosversion >= "10.3" or \
dversion == 8 and macosversion >= "10.4":
#import warnings
#warnings.warn("Mac eggs should be rebuilt to "
# "use the macosx designation instead of darwin.",
# category=DeprecationWarning)
return True
return False # egg isn't macosx or legacy darwin
# are they the same major version and machine type?
[-=- -=- -=- 1814 lines omitted -=- -=- -=-]
project_name = match.group(1)
p = match.end()
extras = []
match = OBRACKET(line,p)
if match:
p = match.end()
line, p, extras = scan_list(
DISTRO, CBRACKET, line, p, (1,), "'extra' name"
)
line, p, specs = scan_list(VERSION,LINE_END,line,p,(1,2),"version spec")
specs = [(op,val.replace('_','-')) for op,val in specs]
yield Requirement(project_name.replace('_','-'), specs, extras)
def _sort_dists(dists):
tmp = [(dist.hashcmp,dist) for dist in dists]
tmp.sort()
dists[::-1] = [d for hc,d in tmp]
class Requirement:
def __init__(self, project_name, specs=(), extras=()):
self.project_name = project_name
self.key = project_name.lower()
index = [(parse_version(v),state_machine[op],op,v) for op,v in specs]
index.sort()
self.specs = [(op,ver) for parsed,trans,op,ver in index]
self.index, self.extras = index, tuple(map(safe_extra,extras))
self.hashCmp = (
self.key, tuple([(op,parsed) for parsed,trans,op,ver in index]),
ImmutableSet(self.extras)
)
self.__hash = hash(self.hashCmp)
def __str__(self):
specs = ','.join([''.join(s) for s in self.specs])
extras = ','.join(self.extras)
if extras: extras = '[%s]' % extras
return '%s%s%s' % (self.project_name, extras, specs)
def __repr__(self): return "Requirement.parse(%r)" % str(self)
def __eq__(self,other):
return isinstance(other,Requirement) and self.hashCmp==other.hashCmp
def __contains__(self,item):
if isinstance(item,Distribution):
if item.key <> self.key: return False
item = item.parsed_version
elif isinstance(item,basestring):
item = parse_version(item)
last = None
for parsed,trans,op,ver in self.index:
action = trans[cmp(item,parsed)]
if action=='F': return False
elif action=='T': return True
elif action=='+': last = True
elif action=='-' or last is None: last = False
if last is None: last = True # no rules encountered
return last
def __hash__(self):
return self.__hash
#@staticmethod
def parse(s):
reqs = list(parse_requirements(s))
if reqs:
if len(reqs)==1:
return reqs[0]
raise ValueError("Expected only one requirement", s)
raise ValueError("No requirements found", s)
parse = staticmethod(parse)
state_machine = {
# =><
'<' : '--T',
'<=': 'T-T',
'>' : 'F+F',
'>=': 'T+F',
'==': 'T..',
'!=': 'F++',
}
def _get_mro(cls):
"""Get an mro for a type or classic class"""
if not isinstance(cls,type):
class cls(cls,object): pass
return cls.__mro__[1:]
return cls.__mro__
def _find_adapter(registry, ob):
"""Return an adapter factory for `ob` from `registry`"""
for t in _get_mro(getattr(ob, '__class__', type(ob))):
if t in registry:
return registry[t]
def ensure_directory(path):
"""Ensure that the parent directory of `path` exists"""
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
def split_sections(s):
"""Split a string or iterable thereof into (section,content) pairs
Each ``section`` is a stripped version of the section header ("[section]")
and each ``content`` is a list of stripped lines excluding blank lines and
comment-only lines. If there are any such lines before the first section
header, they're returned in a first ``section`` of ``None``.
"""
section = None
content = []
for line in yield_lines(s):
if line.startswith("["):
if line.endswith("]"):
if section or content:
yield section, content
section = line[1:-1].strip()
content = []
else:
raise ValueError("Invalid section heading", line)
else:
content.append(line)
# wrap up last segment
yield section, content
# Set up global resource manager
_manager = ResourceManager()
def _initialize(g):
for name in dir(_manager):
if not name.startswith('_'):
g[name] = getattr(_manager, name)
_initialize(globals())
# Prepare the master working set and make the ``require()`` API available
working_set = WorkingSet()
try:
# Does the main program list any requirements?
from __main__ import __requires__
except ImportError:
pass # No: just use the default working set based on sys.path
else:
# Yes: ensure the requirements are met, by prefixing sys.path if necessary
try:
working_set.require(__requires__)
except VersionConflict: # try it without defaults already on sys.path
working_set = WorkingSet([]) # by starting with an empty path
for dist in working_set.resolve(
parse_requirements(__requires__), Environment()
):
working_set.add(dist)
for entry in sys.path: # add any missing entries from sys.path
if entry not in working_set.entries:
working_set.add_entry(entry)
sys.path[:] = working_set.entries # then copy back to sys.path
require = working_set.require
iter_entry_points = working_set.iter_entry_points
add_activation_listener = working_set.subscribe
run_script = working_set.run_script
run_main = run_script # backward compatibility
# Activate all distributions already on sys.path, and ensure that
# all distributions added to the working set in the future (e.g. by
# calling ``require()``) will get activated as well.
add_activation_listener(lambda dist: dist.activate())
working_set.entries=[]; map(working_set.add_entry,sys.path) # match order
More information about the Zope-CVS
mailing list