[Zope3-checkins] CVS: Zope3/src/zope/fssync - metadata.py:1.7
Fred L. Drake, Jr.
fred at zope.com
Wed Aug 6 18:35:15 EDT 2003
Update of /cvs-repository/Zope3/src/zope/fssync
In directory cvs.zope.org:/tmp/cvs-serv24204/fssync
Modified Files:
metadata.py
Log Message:
Always handle the fssync metadata though the zope.fssync.metadata module; never write
to the metadata file directly, or even use the @@Zope directory name directly.
=== Zope3/src/zope/fssync/metadata.py 1.6 => 1.7 ===
--- Zope3/src/zope/fssync/metadata.py:1.6 Fri Jul 25 16:18:48 2003
+++ Zope3/src/zope/fssync/metadata.py Wed Aug 6 17:34:41 2003
@@ -42,8 +42,7 @@
def getnames(self, dir):
"""Return the names of known non-empty metadata entries, sorted."""
- dir = realpath(dir)
- entries = self._getentries(dir)
+ entries = self.getmanager(dir).entries
names = [name for name, entry in entries.iteritems() if entry]
names.sort()
return names
@@ -58,45 +57,20 @@
"""
file = realpath(file)
dir, base = split(file)
- entries = self._getentries(dir)
- if base in entries:
- return entries[base]
- if case_insensitive:
- # Look for a case-insensitive match -- expensive!
- # XXX There's no test case for this code!
- nbase = normcase(base)
- matches = [b for b in entries if normcase(b) == nbase]
- if matches:
- if len(matches) > 1:
- raise KeyError("multiple entries match %r" % nbase)
- return entries[matches[0]]
- # Create a new entry
- entries[base] = entry = {}
- return entry
+ return self.getmanager(dir).getentry(base)
- def _getentries(self, dir):
+ def getmanager(self, dir):
+ dir = realpath(dir)
key = normcase(dir)
- if key in self.cache:
- entries = self.cache[key]
- else:
- efile = join(dir, "@@Zope", "Entries.xml")
- if isfile(efile):
- f = open(efile)
- try:
- data = f.read()
- finally:
- f.close()
- self.cache[key] = entries = load_entries(data)
- else:
- self.cache[key] = entries = {}
- self.originals[key] = copy.deepcopy(entries)
- return entries
+ if key not in self.cache:
+ self.cache[key] = DirectoryManager(dir)
+ return self.cache[key]
def flush(self):
errors = []
- for key in self.cache:
+ for dirinfo in self.cache.itervalues():
try:
- self.flushkey(key)
+ dirinfo.flush()
except (IOError, OSError), err:
errors.append(err)
if errors:
@@ -105,26 +79,72 @@
else:
raise IOError, tuple(errors)
- def flushkey(self, key):
- entries = self.cache[key]
- # Make a copy containing only the "live" (non-empty) entries
+
+class DirectoryManager:
+ def __init__(self, dir):
+ self.zdir = join(dir, "@@Zope")
+ self.efile = join(self.zdir, "Entries.xml")
+ if isfile(self.efile):
+ f = open(self.efile)
+ try:
+ data = f.read()
+ finally:
+ f.close()
+ self.entries = load_entries(data)
+ else:
+ self.entries = {}
+ self.originals = copy.deepcopy(self.entries)
+
+ def ensure(self):
+ """Dump the entries even if there's an empty set."""
+ self._dump(self._get_live())
+
+ def flush(self):
+ """Dump the entries if different from the set stored on disk."""
+ live = self._get_live()
+ if live != self.originals:
+ if exists(self.efile) or live:
+ self._dump(live)
+ self.originals = copy.deepcopy(live)
+
+ def _get_live(self):
+ """Retrieve the set of all 'live' entries."""
live = {}
- for name, entry in entries.iteritems():
+ for name, entry in self.entries.iteritems():
if entry:
live[name] = entry
- if live != self.originals[key]:
- zdir = join(key, "@@Zope")
- efile = join(zdir, "Entries.xml")
- if exists(efile) or live:
- data = dump_entries(live)
- if not exists(zdir):
- os.makedirs(zdir)
- f = open(efile, "w")
- try:
- f.write(data)
- finally:
- f.close()
- self.originals[key] = copy.deepcopy(live)
+ return live
+
+ def _dump(self, live):
+ """Write entries to disk."""
+ data = dump_entries(live)
+ if not exists(self.zdir):
+ os.makedirs(self.zdir)
+ f = open(self.efile, "w")
+ try:
+ f.write(data)
+ finally:
+ f.close()
+
+ def getentry(self, name):
+ """Return a single named entry.
+
+ If there's no matching entry, an empty entry is created.
+ """
+ if name in self.entries:
+ return self.entries[name]
+ if case_insensitive:
+ # Look for a case-insensitive match -- expensive!
+ # XXX There's no test case for this code!
+ nbase = normcase(name)
+ matches = [b for b in entries if normcase(b) == nbase]
+ if matches:
+ if len(matches) > 1:
+ raise KeyError("multiple entries match %r" % nbase)
+ return self.entries[matches[0]]
+ # Create a new entry
+ self.entries[name] = entry = {}
+ return entry
def dump_entries(entries):
More information about the Zope3-Checkins
mailing list