[Zope-Checkins] CVS: ZODB3/ZODB - fspack.py:1.6 FileStorage.py:1.132
Jeremy Hylton
jeremy@zope.com
Fri, 16 May 2003 16:19:16 -0400
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10581/ZODB
Modified Files:
fspack.py FileStorage.py
Log Message:
Backport fix for concurrent writing and packing.
The pack() implementation would leave a corrupted storage behind if
transactions committed while packing.
=== ZODB3/ZODB/fspack.py 1.5 => 1.6 ===
--- ZODB3/ZODB/fspack.py:1.5 Tue May 13 12:29:21 2003
+++ ZODB3/ZODB/fspack.py Fri May 16 16:19:15 2003
@@ -45,6 +45,23 @@
class CorruptedError(Exception):
pass
+class CorruptedDataError(CorruptedError):
+
+ def __init__(self, oid=None, buf=None, pos=None):
+ self.oid = oid
+ self.buf = buf
+ self.pos = pos
+
+ def __str__(self):
+ if self.oid:
+ msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
+ self.buf)
+ else:
+ msg = "Error reading unknown oid. Found %r" % self.buf
+ if self.pos:
+ msg += " at %d" % self.pos
+ return msg
+
# the struct formats for the headers
TRANS_HDR = ">8s8scHHH"
DATA_HDR = ">8s8s8s8sH8s"
@@ -672,6 +689,14 @@
self._tfile.close()
os.remove(self._name + ".pack")
return None
+ self._commit_lock_acquire()
+ self.locked = True
+ self._lock_acquire()
+ try:
+ self._file.seek(0, 2)
+ self.file_end = self._file.tell()
+ finally:
+ self._lock_release()
if ipos < self.file_end:
self.copyRest(ipos)
@@ -795,34 +820,58 @@
# After the pack time, all data records are copied.
# Copy one txn at a time, using copy() for data.
- while ipos < self.file_end:
- th = self._read_txn_header(ipos)
- pos = self._tfile.tell()
- self._copier.setTxnPos(pos)
- self._tfile.write(th.asString())
- tend = ipos + th.tlen
- ipos += th.headerlen()
-
- while ipos < tend:
- h = self._read_data_header(ipos)
- ipos += h.recordlen()
- prev_txn = None
- if h.plen:
- data = self._file.read(h.plen)
- else:
- data = self.fetchBackpointer(h.oid, h.back)
- if h.back:
- prev_txn = self.getTxnFromData(h.oid, h.back)
-
- self._copier.copy(h.oid, h.serial, data, h.version,
- prev_txn, pos, self._tfile.tell())
-
- tlen = self._tfile.tell() - pos
- assert tlen == th.tlen
- self._tfile.write(p64(tlen))
- ipos += 8
-
- self.index.update(self.tindex)
- self.tindex.clear()
- self.vindex.update(self.tvindex)
- self.tvindex.clear()
+ # Release the commit lock every 20 copies
+ self._lock_counter = 0
+
+ try:
+ while 1:
+ ipos = self.copyOne(ipos)
+ except CorruptedDataError, err:
+ # The last call to copyOne() will raise
+ # CorruptedDataError, because it will attempt to read past
+ # the end of the file. Double-check that the exception
+ # occurred for this reason.
+ self._file.seek(0, 2)
+ endpos = self._file.tell()
+ if endpos != err.pos:
+ raise
+
+ def copyOne(self, ipos):
+ # The call below will raise CorruptedDataError at EOF.
+ th = self._read_txn_header(ipos)
+ self._lock_counter += 1
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_release()
+ pos = self._tfile.tell()
+ self._copier.setTxnPos(pos)
+ self._tfile.write(th.asString())
+ tend = ipos + th.tlen
+ ipos += th.headerlen()
+
+ while ipos < tend:
+ h = self._read_data_header(ipos)
+ ipos += h.recordlen()
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ self._copier.copy(h.oid, h.serial, data, h.version,
+ prev_txn, pos, self._tfile.tell())
+
+ tlen = self._tfile.tell() - pos
+ assert tlen == th.tlen
+ self._tfile.write(p64(tlen))
+ ipos += 8
+
+ self.index.update(self.tindex)
+ self.tindex.clear()
+ self.vindex.update(self.tvindex)
+ self.tvindex.clear()
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_acquire()
+ return ipos
+
=== ZODB3/ZODB/FileStorage.py 1.131 => 1.132 ===
--- ZODB3/ZODB/FileStorage.py:1.131 Thu May 1 13:10:19 2003
+++ ZODB3/ZODB/FileStorage.py Fri May 16 16:19:15 2003
@@ -1460,25 +1460,28 @@
if opos is None:
return
oldpath = self._file_name + ".old"
- self._file.close()
+ self._lock_acquire()
try:
- if os.path.exists(oldpath):
- os.remove(oldpath)
- os.rename(self._file_name, oldpath)
- except Exception, msg:
- self._file = open(self._file_name, 'r+b')
- raise
+ self._file.close()
+ try:
+ if os.path.exists(oldpath):
+ os.remove(oldpath)
+ os.rename(self._file_name, oldpath)
+ except Exception, msg:
+ self._file = open(self._file_name, 'r+b')
+ raise
- # OK, we're beyond the point of no return
- os.rename(self._file_name + '.pack', self._file_name)
- self._file = open(self._file_name, 'r+b')
- self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
- self._pos = opos
- self._save_index()
+ # OK, we're beyond the point of no return
+ os.rename(self._file_name + '.pack', self._file_name)
+ self._file = open(self._file_name, 'r+b')
+ self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+ self._pos = opos
+ self._save_index()
+ finally:
+ self._lock_release()
finally:
if p.locked:
self._commit_lock_release()
- self._lock_release()
self._lock_acquire()
self._packt = z64
self._lock_release()