[Zope3-checkins] CVS: Zope3/src/zope/fssync - bundle.py:1.2
fsbundle.py:1.2 metadata.py:1.9
Fred L. Drake, Jr.
fred at zope.com
Fri Aug 15 19:23:21 EDT 2003
Update of /cvs-repository/Zope3/src/zope/fssync
In directory cvs.zope.org:/tmp/cvs-serv30623
Modified Files:
bundle.py fsbundle.py metadata.py
Log Message:
Add support create a bundle from another bundle or from a site manangement
folder.
=== Zope3/src/zope/fssync/bundle.py 1.1 => 1.2 ===
--- Zope3/src/zope/fssync/bundle.py:1.1 Tue Aug 12 18:08:34 2003
+++ Zope3/src/zope/fssync/bundle.py Fri Aug 15 18:23:15 2003
@@ -60,10 +60,15 @@
if type:
raise Usage("-t/--type can only be given once")
type = arg
- if len(args) != 1:
+ source = None
+ if len(args) == 1:
+ path = args[0]
+ elif len(args) == 2:
+ path, source = args
+ else:
raise Usage("create requires exactly one path")
fs = FSBundle()
- fs.create(args[0], type, factory)
+ fs.create(path, type, factory, source)
command_table = [
=== Zope3/src/zope/fssync/fsbundle.py 1.1 => 1.2 ===
--- Zope3/src/zope/fssync/fsbundle.py:1.1 Tue Aug 12 18:08:34 2003
+++ Zope3/src/zope/fssync/fsbundle.py Fri Aug 15 18:23:15 2003
@@ -18,11 +18,15 @@
import os
import posixpath
+import shutil
from zope.fssync.fssync import FSSync
from zope.fssync.fsutil import Error
from zope.fssync.metadata import Metadata
+BUNDLE_TYPE = "zope.app.services.bundle.Bundle"
+FOLDER_TYPE = "zope.app.services.folder.SiteManagementFolder"
+
class FSBundle(object):
@@ -32,14 +36,26 @@
# bundle operations
- def create(self, path, type, factory):
+ def create(self, path, type, factory, source=None):
+ print (path, source)
if os.path.exists(path):
raise Error("%r already exists", path)
dir, name = os.path.split(path)
self.check_name(name)
- self.check_parent_directory(dir)
- if factory is None and type is None:
- factory = type = "zope.app.services.bundle.Bundle"
+ self.check_directory(dir)
+ self.check_directory_known(dir)
+ if source is not None:
+ self.check_source(source)
+ if type is None and factory is None:
+ srctype, srcfactory = self.get_typeinfo(source)
+ if srctype == FOLDER_TYPE:
+ factory = type = BUNDLE_TYPE
+ else:
+ # source is already a bundle; create the same type
+ type = srctype
+ factory = srcfactory
+ elif factory is None and type is None:
+ factory = type = BUNDLE_TYPE
self.sync.mkdir(path)
entry = self.metadata.getentry(path)
assert entry.get("flag") == "added"
@@ -47,19 +63,59 @@
entry["factory"] = factory
if type:
entry["type"] = type
+ self.copychildren(source, path, entry["path"])
self.metadata.flush()
# helper methods
- def check_parent_directory(self, dir):
+ def copy(self, src, dst, name, path):
+ # Copy src to dst, including relevant metadata, Extra, and
+ # Annotations components.
+ type, factory = self.get_typeinfo(src)
+ if os.path.isdir(src):
+ os.mkdir(dst)
+ self.sync.add(dst, type, factory)
+ self.copychildren(src, dst, path)
+ else:
+ shutil.copyfile(src, dst)
+ self.sync.add(dst, type, factory)
+
+ def copychildren(self, src, dst, path):
+ for name in self.metadata.getnames(src):
+ self.copy(os.path.join(src, name),
+ os.path.join(dst, name),
+ name,
+ posixpath.join(path, name))
+
+ def check_source(self, source):
+ # make sure the source is a site-management folder or a bundle
+ if not os.path.exists(source):
+ raise Error("%r does not exist", source)
+ if not os.path.isdir(source):
+ raise Error("%r must be a directory", source)
+ self.check_directory_known(os.path.dirname(source))
+ self.check_directory_known(source)
+ type, factory = self.get_typeinfo(source)
+ if type == BUNDLE_TYPE:
+ pass
+ elif type == FOLDER_TYPE:
+ pass
+ else:
+ # don't know; play it safe
+ raise Error(
+ "%r doesn't appear to be a bundle or site-management folder",
+ source)
+
+ def check_directory(self, dir):
if dir:
if not os.path.exists(dir):
raise Error("%r does not exist", dir)
if not os.path.isdir(dir):
raise Error("%r is not a directory", dir)
- else:
- dir = os.curdir
- # XXX this might not be the right check
+ # else: os.curdir assumed
+
+ def check_directory_known(self, dir):
+ dir = dir or os.curdir
entry = self.metadata.getentry(dir)
if not entry:
raise Error("nothing known about", dir)
@@ -90,3 +146,7 @@
except ValueError:
p3 = parts[3]
return (n0, n1, n2, p3)
+
+ def get_typeinfo(self, path):
+ entry = self.metadata.getentry(path)
+ return entry.get("type"), entry.get("factory")
=== Zope3/src/zope/fssync/metadata.py 1.8 => 1.9 ===
--- Zope3/src/zope/fssync/metadata.py:1.8 Tue Aug 12 00:39:22 2003
+++ Zope3/src/zope/fssync/metadata.py Fri Aug 15 18:23:15 2003
@@ -28,7 +28,7 @@
from cStringIO import StringIO
from os.path import exists, isdir, isfile, split, join, realpath, normcase
-from xml.sax import ContentHandler, parseString
+from xml.sax import ContentHandler, parse, parseString
from xml.sax.saxutils import quoteattr
case_insensitive = (normcase("ABC") == normcase("abc"))
@@ -85,12 +85,7 @@
self.zdir = join(dir, "@@Zope")
self.efile = join(self.zdir, "Entries.xml")
if isfile(self.efile):
- f = open(self.efile)
- try:
- data = f.read()
- finally:
- f.close()
- self.entries = load_entries(data)
+ self.entries = load_entries_path(self.efile)
else:
self.entries = {}
self.originals = copy.deepcopy(self.entries)
@@ -175,6 +170,29 @@
return loads(text)
else:
return ch.entries
+
+
+def load_entries_path(path):
+ f = open(path, 'rb')
+ ch = EntriesHandler()
+ try:
+ try:
+ parse(f, ch)
+ except FoundXMLPickle:
+ pass
+ else:
+ return ch.entries
+ finally:
+ f.close()
+
+ # found an XML pickle; load that instead
+ from zope.xmlpickle import loads
+ f = open(path, 'rb')
+ try:
+ return loads(f.read())
+ finally:
+ f.close()
+
class EntriesHandler(ContentHandler):
More information about the Zope3-Checkins
mailing list