[Zope-CVS] CVS: Products/AdaptableStorage/gateway_fs - FSConnection.py:1.2 FSDirectoryItems.py:1.2 TransactionalWrites.py:NONE
Shane Hathaway
shane@zope.com
Tue, 3 Dec 2002 18:11:23 -0500
Update of /cvs-repository/Products/AdaptableStorage/gateway_fs
In directory cvs.zope.org:/tmp/cvs-serv22143/gateway_fs
Modified Files:
FSConnection.py FSDirectoryItems.py
Removed Files:
TransactionalWrites.py
Log Message:
Running AdaptableStorage with the latest Zope revealed some flaws.
Fixed them all.
- Consistent ordering of transaction participants now makes it impossible to
add a jar to the transaction after the commit() method has begun.
AdaptableStorage (and perhaps other projects like ZPatterns) relied on
the ability to add a jar after commit has started. This could lead to
a deadlock. Reworked ASStorage, FSConnection, and the tests to deal with
this.
- Serials are now required to be hashable. This makes serials, used to
prevent conflicts, simpler and more robust.
- DBTab needs some kind of class it can call directly, so I added
the small subclasses FSStorage and FSDatabase to Zope2FS.
- Restored the PersistentExtra patch.
- The directory items gateway wants to write data about its children, but
sometimes its children aren't being written at the same time. Added
a "conditional" optional flag to FSConnection.writeSection(), allowing
data to be written only if other data gets written.
=== Products/AdaptableStorage/gateway_fs/FSConnection.py 1.1 => 1.2 ===
--- Products/AdaptableStorage/gateway_fs/FSConnection.py:1.1 Wed Nov 27 13:37:05 2002
+++ Products/AdaptableStorage/gateway_fs/FSConnection.py Tue Dec 3 18:10:52 2002
@@ -23,7 +23,6 @@
from interfaces.public import IFSConnection
from exceptions import FSWriteError
-from TransactionalWrites import getTransactionalWrites
# Try to decipher this one ;-)
@@ -52,9 +51,15 @@
def __init__(self, basepath):
self.basepath = basepath
+ self._final = 0
+ # _pending holds the data to be written.
+ # _pending: { subpath string -> { section_name -> data } }
+ self._pending = {}
+ # _conditional holds data that will be written only if the
+ # rest of the data for a subpath is in _pending.
+ # _conditional: { subpath string -> { section_name -> data } }
+ self._conditional = {}
- def sortKey(self):
- return self.basepath
def expandPath(self, subpath, check_exists=0):
if self.basepath:
@@ -65,7 +70,7 @@
# unchanged.
path = subpath
if check_exists:
- assert os.path.exists(path), path
+ assert os.path.exists(path), '%s does not exist' % path
return path
@@ -78,7 +83,7 @@
assert section_name != DATA_SECTION
- def _write(self, subpath, section_name, data):
+ def _write(self, subpath, section_name, data, conditional=0):
# XXX We should be checking for '..'
path = self.expandPath(subpath)
# Do some early checking.
@@ -87,12 +92,12 @@
if not v:
raise FSWriteError(
"Can't get write access to %s" % subpath)
- getTransactionalWrites(self).record(subpath, section_name, data)
+ self.record(subpath, section_name, data, conditional)
- def writeSection(self, subpath, section_name, data):
+ def writeSection(self, subpath, section_name, data, conditional=0):
self.checkSectionName(section_name)
- self._write(subpath, section_name, data)
+ self._write(subpath, section_name, data, conditional)
def writeNodeType(self, subpath, data):
@@ -103,8 +108,7 @@
if (want_dir != (not not os.path.isdir(path))):
raise FSWriteError(
"Can't mix file and directory at %s" % subpath)
- getTransactionalWrites(self).record(
- subpath, NODE_TYPE_SECTION, data)
+ self.record(subpath, NODE_TYPE_SECTION, data, 0)
def writeData(self, subpath, data):
@@ -221,9 +225,9 @@
props_f.close()
- def removeUnlinkedItems(self, path, items):
+ def removeUnlinkedItems(self, path, names):
linked = {}
- for name in items:
+ for name in names:
linked[name] = 1
existing = os.listdir(path)
for fn in existing:
@@ -242,8 +246,10 @@
non_containers = {}
for subpath, sections in items:
# type must be provided and must always be either 'd' or 'f'.
- if not sections.has_key(NODE_TYPE_SECTION):
- raise FSWriteError('node type not specified for %s' % subpath)
+ if (not sections.has_key(NODE_TYPE_SECTION)
+ or not sections.has_key(DATA_SECTION)):
+ raise FSWriteError(
+ 'Data or node type not specified for %s' % subpath)
t = sections[NODE_TYPE_SECTION]
if t not in 'df':
raise FSWriteError(
@@ -254,8 +260,77 @@
"Not a directory: %s" % dir)
if t == 'f':
non_containers[subpath] = 1
+ if not isinstance(sections[DATA_SECTION], StringType):
+ raise FSWriteError(
+ 'Data for a file must be a string at %s'
+ % subpath)
else:
if isinstance(sections[DATA_SECTION], StringType):
raise FSWriteError(
'Data for a directory must be a list or tuple at %s'
% subpath)
+
+ #
+ # ITPCConnection implementation
+ #
+
+ def sortKey(self):
+ return self.basepath
+
+ def getName(self):
+ return self.basepath
+
+ def record(self, subpath, section_name, data, conditional=0):
+ """Records data to be written at commit time"""
+ if conditional:
+ m = self._conditional
+ else:
+ m = self._pending
+ sections = m.get(subpath)
+ if sections is None:
+ sections = {}
+ m[subpath] = sections
+ if sections.has_key(section_name):
+ if sections[section_name] != data:
+ raise FSWriteError(
+ 'Conflicting data storage at %s (%s)' %
+ (subpath, section_name))
+ else:
+ sections[section_name] = data
+
+ def begin(self):
+ pass
+
+ def vote(self):
+ """Do some early verification
+
+ This is done while the transaction can still be vetoed safely.
+ """
+ for subpath, sections in self._conditional.items():
+ if self._pending.get(subpath):
+ # Add the conditional data.
+ for section_name, data in sections.items():
+ self.record(subpath, section_name, data)
+ self._conditional.clear()
+ items = self._pending.items()
+ items.sort() # Ensure that base directories come first.
+ self.beforeWrite(items)
+ self._final = 1
+
+ def abort(self):
+ self._final = 0
+ self._pending.clear()
+ self._conditional.clear()
+
+ def finish(self):
+ if self._final:
+ self._final = 0
+ try:
+ items = self._pending.items()
+ items.sort() # Ensure that base directories come first.
+ for subpath, sections in items:
+ self.writeFinal(subpath, sections)
+ finally:
+ self._pending.clear()
+ self._conditional.clear()
+
=== Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py 1.1 => 1.2 ===
--- Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py:1.1 Wed Nov 27 13:37:05 2002
+++ Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py Tue Dec 3 18:10:52 2002
@@ -34,6 +34,7 @@
def getSchema(self):
return self.schema
+
def load(self, object_mapper, key):
c = self.fs_conn
assert c.readNodeType(key) == 'd'
@@ -59,10 +60,10 @@
res.append((name, classification))
items = classification.items()
items.sort()
- serial.append((name, items))
+ serial.append((name, tuple(items)))
res.sort()
serial.sort()
- return tuple(res), serial
+ return tuple(res), tuple(serial)
def store(self, object_mapper, key, state):
@@ -75,14 +76,14 @@
subkey = '%s/%s' % (key, name)
items = classification.items()
items.sort()
- serial.append((name, items))
+ serial.append((name, tuple(items)))
text = []
for k, v in items:
text.append('%s=%s' % (k, v))
text = '\n'.join(text)
- c.writeSection(subkey, 'classification', text)
+ c.writeSection(subkey, 'classification', text, 1)
names.sort()
serial.sort()
c.writeData(key, names)
- return serial
+ return tuple(serial)
=== Removed File Products/AdaptableStorage/gateway_fs/TransactionalWrites.py ===