[Zope-Checkins] CVS: ZODB3/ZODB - fspack.py:1.6 FileStorage.py:1.132

Jeremy Hylton jeremy@zope.com
Fri, 16 May 2003 16:19:16 -0400


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10581/ZODB

Modified Files:
	fspack.py FileStorage.py 
Log Message:
Backport fix for concurrent writing and packing.

The pack() implementation would leave a corrupted storage behind if
transactions committed while packing.


=== ZODB3/ZODB/fspack.py 1.5 => 1.6 ===
--- ZODB3/ZODB/fspack.py:1.5	Tue May 13 12:29:21 2003
+++ ZODB3/ZODB/fspack.py	Fri May 16 16:19:15 2003
@@ -45,6 +45,23 @@
 class CorruptedError(Exception):
     pass
 
+class CorruptedDataError(CorruptedError):
+
+    def __init__(self, oid=None, buf=None, pos=None):
+        self.oid = oid
+        self.buf = buf
+        self.pos = pos
+
+    def __str__(self):
+        if self.oid:
+            msg = "Error reading oid %s.  Found %r" % (_fmt_oid(self.oid),
+                                                       self.buf)
+        else:
+            msg = "Error reading unknown oid.  Found %r" % self.buf
+        if self.pos:
+            msg += " at %d" % self.pos
+        return msg
+
 # the struct formats for the headers
 TRANS_HDR = ">8s8scHHH"
 DATA_HDR = ">8s8s8s8sH8s"
@@ -672,6 +689,14 @@
             self._tfile.close()
             os.remove(self._name + ".pack")
             return None
+        self._commit_lock_acquire()
+        self.locked = True
+        self._lock_acquire()
+        try:
+            self._file.seek(0, 2)
+            self.file_end = self._file.tell()
+        finally:
+            self._lock_release()
         if ipos < self.file_end:
             self.copyRest(ipos)
 
@@ -795,34 +820,58 @@
         # After the pack time, all data records are copied.
         # Copy one txn at a time, using copy() for data.
 
-        while ipos < self.file_end:
-            th = self._read_txn_header(ipos)
-            pos = self._tfile.tell()
-            self._copier.setTxnPos(pos)
-            self._tfile.write(th.asString())
-            tend = ipos + th.tlen
-            ipos += th.headerlen()
-
-            while ipos < tend:
-                h = self._read_data_header(ipos)
-                ipos += h.recordlen()
-                prev_txn = None
-                if h.plen:
-                    data = self._file.read(h.plen)
-                else:
-                    data = self.fetchBackpointer(h.oid, h.back)
-                    if h.back:
-                        prev_txn = self.getTxnFromData(h.oid, h.back)
-
-                self._copier.copy(h.oid, h.serial, data, h.version,
-                                  prev_txn, pos, self._tfile.tell())
-
-            tlen = self._tfile.tell() - pos
-            assert tlen == th.tlen
-            self._tfile.write(p64(tlen))
-            ipos += 8
-
-            self.index.update(self.tindex)
-            self.tindex.clear()
-            self.vindex.update(self.tvindex)
-            self.tvindex.clear()
+        # Release the commit lock every 20 copies
+        self._lock_counter = 0
+
+        try:
+            while 1:
+                ipos = self.copyOne(ipos)
+        except CorruptedDataError, err:
+            # The last call to copyOne() will raise
+            # CorruptedDataError, because it will attempt to read past
+            # the end of the file.  Double-check that the exception
+            # occurred for this reason.
+            self._file.seek(0, 2)
+            endpos = self._file.tell()
+            if endpos != err.pos:
+                raise
+
+    def copyOne(self, ipos):
+        # The call below will raise CorruptedDataError at EOF.
+        th = self._read_txn_header(ipos)
+        self._lock_counter += 1
+        if self._lock_counter % 20 == 0:
+            self._commit_lock_release()
+        pos = self._tfile.tell()
+        self._copier.setTxnPos(pos)
+        self._tfile.write(th.asString())
+        tend = ipos + th.tlen
+        ipos += th.headerlen()
+
+        while ipos < tend:
+            h = self._read_data_header(ipos)
+            ipos += h.recordlen()
+            prev_txn = None
+            if h.plen:
+                data = self._file.read(h.plen)
+            else:
+                data = self.fetchBackpointer(h.oid, h.back)
+                if h.back:
+                    prev_txn = self.getTxnFromData(h.oid, h.back)
+
+            self._copier.copy(h.oid, h.serial, data, h.version,
+                              prev_txn, pos, self._tfile.tell())
+
+        tlen = self._tfile.tell() - pos
+        assert tlen == th.tlen
+        self._tfile.write(p64(tlen))
+        ipos += 8
+
+        self.index.update(self.tindex)
+        self.tindex.clear()
+        self.vindex.update(self.tvindex)
+        self.tvindex.clear()
+        if self._lock_counter % 20 == 0:
+            self._commit_lock_acquire()
+        return ipos
+


=== ZODB3/ZODB/FileStorage.py 1.131 => 1.132 ===
--- ZODB3/ZODB/FileStorage.py:1.131	Thu May  1 13:10:19 2003
+++ ZODB3/ZODB/FileStorage.py	Fri May 16 16:19:15 2003
@@ -1460,25 +1460,28 @@
             if opos is None:
                 return
             oldpath = self._file_name + ".old"
-            self._file.close()
+            self._lock_acquire()
             try:
-                if os.path.exists(oldpath):
-                    os.remove(oldpath)
-                os.rename(self._file_name, oldpath)
-            except Exception, msg:
-                self._file = open(self._file_name, 'r+b')
-                raise
+                self._file.close()
+                try:
+                    if os.path.exists(oldpath):
+                        os.remove(oldpath)
+                    os.rename(self._file_name, oldpath)
+                except Exception, msg:
+                    self._file = open(self._file_name, 'r+b')
+                    raise
 
-            # OK, we're beyond the point of no return
-            os.rename(self._file_name + '.pack', self._file_name)
-            self._file = open(self._file_name, 'r+b')
-            self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
-            self._pos = opos
-            self._save_index()
+                # OK, we're beyond the point of no return
+                os.rename(self._file_name + '.pack', self._file_name)
+                self._file = open(self._file_name, 'r+b')
+                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+                self._pos = opos
+                self._save_index()
+            finally:
+                self._lock_release()
         finally:
             if p.locked:
                 self._commit_lock_release()
-                self._lock_release()
             self._lock_acquire()
             self._packt = z64
             self._lock_release()