[Zope-CVS] CVS: Products/Basket - basket.py:1.1 unzip.py:1.1
__init__.py:1.23
Chris McDonough
chrism at plope.com
Thu Nov 10 13:21:59 EST 2005
Update of /cvs-repository/Products/Basket
In directory cvs.zope.org:/tmp/cvs-serv15635
Modified Files:
__init__.py
Added Files:
basket.py unzip.py
Log Message:
Break oiut Basket class and support functions into a basket.py module.
If an egg is marked explicitly as "not-zip-safe", unzip it to a tempdir and add the tempdir to sys.path.
=== Added File Products/Basket/basket.py ===
import sys
import os
import traceback
import textwrap
import atexit
import shutil
import tempfile
import unzip
import zLOG
import App
from utils import EggProductContext
from utils import EggProduct
import pkg_resources
entrypoint_group = 'zope2.initialize'
class Basket(object):
def __init__(self):
self.pre_initialized = False
self.tempdirs = []
#atexit.register(self.cleanup)
try:
etc = os.path.join(INSTANCE_HOME, 'etc')
except NameError: # INSTANCE_HOME may not be available?
etc = ''
self.pdist_fname = os.path.join(etc, 'PRODUCT_DISTRIBUTIONS.txt')
def require(self, distro_str):
""" Specifically require a distribution specification """
pkg_resources.require(distro_str)
def parse_product_distributions_file(self, fp):
L = []
for distro_str in fp:
distro_str = distro_str.strip()
if distro_str:
L.append(distro_str)
return L
def initialize(self, context):
context.registerClass(EggProduct, constructors = (('dummy',None),),
visibility=None, icon='icon_egg.gif')
# Grab app from Zope product context
# It's a "protected" attribute, hence the name mangling
app = context._ProductContext__app
debug_mode = App.config.getConfiguration().debug_mode
if not self.pre_initialized: # this services unit testing
self.preinitialize()
data = []
points = pkg_resources.iter_entry_points(entrypoint_group)
meta_types = []
for point in points:
# XXX deal with duplicate product names by raising an exception
# somewhere in here.
eggname = ' '.join(textwrap.wrap(point.dist.location, 80))
try:
product_pkg = get_containing_package(point.module_name)
except:
zLOG.LOG('Egg Product Init', zLOG.ERROR,
'Problem initializing product with entry point '
'"%s" in module "%s"' % (point.name,point.module_name),
error=sys.exc_info())
if debug_mode:
raise
else:
continue
productname = product_pkg.__name__.split('.')[-1]
initializer = get_initializer(point, productname, debug_mode)
context = EggProductContext(productname, initializer, app,
product_pkg, eggname)
returned = context.install(debug_mode)
data.append(returned)
return data
def product_distributions_by_dwim(self):
""" Return all product distributions which have an appropriate
entry point group on sys.path """
environment = pkg_resources.Environment()
product_distros = []
for project_name in environment:
distributions = environment[project_name]
for distribution in distributions:
if is_product_distribution(distribution):
product_distros.append(distribution)
return product_distros
def product_distributions_by_require(self):
""" Return all product distributions which are listed in the
product distributions file """
pdist_file = open(self.pdist_fname, 'r')
strings = self.parse_product_distributions_file(pdist_file)
product_distros = []
for string in strings:
distribution = pkg_resources.get_distribution(string)
if is_product_distribution(distribution):
product_distros.append(distribution)
else:
zLOG.LOG('Egg Product Init',
zLOG.ERROR,
'A requirement was listed in %s that is not a Zope '
'product package: %s' % (self.pdist_fname, string))
return product_distros
def preinitialize(self):
pdist_fname = self.pdist_fname
if pdist_fname and os.path.exists(pdist_fname):
distributions = self.product_distributions_by_require()
else:
distributions = self.product_distributions_by_dwim()
for distribution in distributions:
if is_zip_safe_distribution(distribution):
pkg_resources.working_set.add(distribution)
else:
# if it's not zip-safe, blast it out to a tempdir and create
# new distro out of the file-based egg; we delete the
# tempdir at system exit
tempdir = tempfile.mkdtemp()
eggname = os.path.basename(distribution.location)
eggdir = os.path.join(tempdir, eggname)
os.makedirs(eggdir)
self.tempdirs.append(tempdir)
un = unzip.unzip()
un.extract(distribution.location, eggdir)
new_distro = pkg_resources.Distribution.from_filename(eggdir)
# XXX this is nasty... create an API for this
working_set = pkg_resources.working_set
working_set.entries.remove(distribution.location)
del working_set.by_key[distribution.key]
working_set.entry_keys[distribution.location] = []
sys.path.remove(distribution.location)
working_set.add(new_distro)
self.pre_initialized = True
def cleanup(self):
for tempdir in self.tempdirs:
shutil.rmtree(tempdir, ignore_errors=True)
def get_containing_package(module_name):
__import__(module_name)
thing = sys.modules[module_name]
if hasattr(thing, '__path__'):
return thing
new = '.'.join(module_name.split('.')[:-1])
if new == module_name:
return None
return get_containing_package(new)
def get_initializer(point, productname, debug_mode):
initializer = None
try:
# this will raise an import error if the initializer can't
# be imported (presumably because of a module-scope error)
initializer = point.load()
except:
exc = sys.exc_info()
zLOG.LOG('Zope', zLOG.ERROR, 'Could not import %s' % productname,
error=exc)
f = StringIO()
traceback.print_exc(100, f)
product_pkg.__import_error__ = f.getvalue()
if debug_mode:
raise exc[0], exc[1], exc[2]
return initializer
def is_product_distribution(distribution):
entry_meta = 'entry_points.txt'
if distribution.has_metadata(entry_meta):
inifile = distribution.get_metadata(entry_meta)
sections = pkg_resources.split_sections(inifile)
for section, content in sections:
if section == entrypoint_group:
return True
def is_zip_safe_distribution(distribution):
return not distribution.has_metadata('not-zip-safe')
=== Added File Products/Basket/unzip.py ===
""" unzip.py
Version: 1.1
Extract a zipfile to the directory provided
It first creates the directory structure to house the files
then it extracts the files to it.
Sample usage:
command line
unzip.py -p 10 -z c:\testfile.zip -o c:\testoutput
python class
import unzip
un = unzip.unzip()
un.extract(r'c:\testfile.zip', 'c:\testoutput')
By Doug Tolton
"""
import sys
import zipfile
import os
import os.path
import getopt
class unzip:
def extract(self, file, dir):
if not dir.endswith(':') and not os.path.exists(dir):
os.mkdir(dir)
zf = zipfile.ZipFile(file)
# create directory structure to house files
self._createstructure(file, dir)
num_files = len(zf.namelist())
# extract files to directory structure
for i, name in enumerate(zf.namelist()):
if not name.endswith('/'):
outfile = open(os.path.join(dir, name), 'wb')
outfile.write(zf.read(name))
outfile.flush()
outfile.close()
def _createstructure(self, file, dir):
self._makedirs(self._listdirs(file), dir)
def _makedirs(self, directories, basedir):
""" Create any directories that don't currently exist """
for dir in directories:
curdir = os.path.join(basedir, dir)
if not os.path.exists(curdir):
os.makedirs(curdir)
def _listdirs(self, file):
""" Grabs all the directories in the zip structure
This is necessary to create the structure before trying
to extract the file to it. """
zf = zipfile.ZipFile(file)
dirs = []
for name in zf.namelist():
dirname = os.path.dirname(name)
if dirname:
dirs.append(dirname)
dirs.sort()
return dirs
=== Products/Basket/__init__.py 1.22 => 1.23 ===
--- Products/Basket/__init__.py:1.22 Wed Nov 9 22:13:52 2005
+++ Products/Basket/__init__.py Thu Nov 10 13:21:28 2005
@@ -1,7 +1,4 @@
import sys
-import os
-import traceback
-import textwrap
# pkg_resource monkeypatching (if necessary) needs to happen before
# Products.Basket.utils is imported
@@ -14,153 +11,7 @@
import pkg_resources_0_6a7 as pkg_resources
sys.modules['pkg_resources'] = pkg_resources
-from Products.Basket.utils import EggProductContext
-from Products.Basket.utils import EggProduct
-import zLOG
-import App
-
-entrypoint_group = 'zope2.initialize'
-
-class Basket(object):
- def __init__(self):
- self.pre_initialized = False
-
- def require(self, distro_str):
- """ Specifically require a distribution specification """
- pkg_resources.require(distro_str)
-
- def parse_product_distributions_file(self, fp):
- L = []
- for distro_str in fp:
- distro_str = distro_str.strip()
- if distro_str:
- L.append(distro_str)
- return L
-
- def initialize(self, context):
- context.registerClass(EggProduct, constructors = (('dummy',None),),
- visibility=None, icon='icon_egg.gif')
- # Grab app from Zope product context
- # It's a "protected" attribute, hence the name mangling
- app = context._ProductContext__app
- debug_mode = App.config.getConfiguration().debug_mode
-
- if not self.pre_initialized:
- try:
- etc = os.path.join(INSTANCE_HOME, 'etc')
- except NameError: # INSTANCE_HOME may not be available
- etc = ''
- pdist_fname = os.path.join(etc, 'PRODUCT_DISTRIBUTIONS.txt')
- self.preinitialize(pdist_fname)
-
- data = []
- points = pkg_resources.iter_entry_points(entrypoint_group)
- meta_types = []
-
- for point in points:
- # XXX deal with duplicate product names by raising an exception
- # somewhere in here.
- eggname = ' '.join(textwrap.wrap(point.dist.location, 80))
- try:
- product_pkg = get_containing_package(point.module_name)
- except:
- zLOG.LOG('Egg Product Init', zLOG.ERROR,
- 'Problem initializing product with entry point '
- '"%s" in module "%s"' % (point.name,point.module_name),
- error=sys.exc_info())
- if debug_mode:
- raise
- else:
- continue
-
- productname = product_pkg.__name__.split('.')[-1]
- initializer = get_initializer(point, productname, debug_mode)
- context = EggProductContext(productname, initializer, app,
- product_pkg, eggname)
- returned = context.install(debug_mode)
- data.append(returned)
- return data
-
- def product_distributions_by_dwim(self):
- """ Return all product distributions which have an appropriate
- entry point group on sys.path """
- environment = pkg_resources.Environment()
- product_distros = []
- for project_name in environment:
- distributions = environment[project_name]
- for distribution in distributions:
- if is_product_distribution(distribution):
- product_distros.append(distribution)
- return product_distros
-
- def product_distributions_by_require(self, pdist_fname):
- """ Return all product distributions which are listed in the
- product distributions file """
- pdist_file = open(pdist_fname, 'r')
- strings = self.parse_product_distributions_file(pdist_file)
- product_distros = []
- for string in strings:
- distribution = pkg_resources.get_distribution(string)
- if is_product_distribution(distribution):
- product_distros.append(distribution)
- else:
- zLOG.LOG('Egg Product Init',
- zLOG.ERROR,
- 'A requirement was listed in %s that is not a Zope '
- 'product package: %s' % (pdist_fname, string))
- return product_distros
-
- def preinitialize(self, pdist_fname=None):
- if pdist_fname and os.path.exists(pdist_fname):
- distributions = self.product_distributions_by_require(pdist_fname)
- else:
- distributions = self.product_distributions_by_dwim()
-
- for distribution in distributions:
- pkg_resources.working_set.add(distribution)
-
- self.pre_initialized = True
-
-def get_containing_package(module_name):
- __import__(module_name)
- thing = sys.modules[module_name]
- if hasattr(thing, '__path__'):
- return thing
- new = '.'.join(module_name.split('.')[:-1])
- if new == module_name:
- return None
- return get_containing_package(new)
-
-def get_initializer(point, productname, debug_mode):
- initializer = None
- try:
- # this will raise an import error if the initializer can't
- # be imported (presumably because of a module-scope error)
- initializer = point.load()
- except:
- exc = sys.exc_info()
- zLOG.LOG('Zope', zLOG.ERROR, 'Could not import %s' % productname,
- error=exc)
- f = StringIO()
- traceback.print_exc(100, f)
- product_pkg.__import_error__ = f.getvalue()
- if debug_mode:
- raise exc[0], exc[1], exc[2]
- return initializer
-
-def is_product_distribution(distribution):
- entry_meta = 'entry_points.txt'
- if distribution.has_metadata(entry_meta):
- inifile = distribution.get_metadata(entry_meta)
- sections = pkg_resources.split_sections(inifile)
- for section, content in sections:
- if section == entrypoint_group:
- return True
-
-def is_zip_safe_distribution(distribution):
- zip_safe_meta = 'zip-safe'
- if distribution.has_metadata(zip_safe_meta):
- return True
+from Products.Basket.basket import Basket
basket = Basket()
initialize = basket.initialize
@@ -180,4 +31,3 @@
del resource.ImageResource
del resource.DTMLResource
del resource.PageTemplateResource
-
More information about the Zope-CVS
mailing list