[Zope3-checkins] CVS: ZODB4/src/zodb/storage/file - pack.py:1.6 main.py:1.4 format.py:1.4

Jeremy Hylton jeremy@zope.com
Thu, 15 May 2003 17:00:55 -0400


Update of /cvs-repository/ZODB4/src/zodb/storage/file
In directory cvs.zope.org:/tmp/cvs-serv2273/storage/file

Modified Files:
	pack.py main.py format.py 
Log Message:
Fix pack to do locking.

XXX Must backport to ZODB3.1.2 when this is finished.

The interim fix is to acquire the commit lock when the
copy-after-pack-time phase begins and release it when that phase is
over.  A better solution is to periodically release the lock and
re-acquire it during this phase.


=== ZODB4/src/zodb/storage/file/pack.py 1.5 => 1.6 ===
--- ZODB4/src/zodb/storage/file/pack.py:1.5	Thu May  1 15:35:01 2003
+++ ZODB4/src/zodb/storage/file/pack.py	Thu May 15 17:00:54 2003
@@ -31,6 +31,7 @@
 from zodb.utils import p64, u64
 from zodb.storage.base import splitrefs
 from zodb.storage.file.copy import DataCopier
+from zodb.storage.file.errors import CorruptedDataError
 from zodb.storage.file.format import FileStorageFormatter
 from zodb.storage.file.index import fsIndex
 
@@ -302,6 +303,14 @@
             self._tfile.close()
             os.remove(self._name + ".pack")
             return None
+        self._commit_lock_acquire()
+        self.locked = True
+        self._lock_acquire()
+        try:
+            self._file.seek(0, 2)
+            self.file_end = self._file.tell()
+        finally:
+            self._lock_release()
         if ipos < self.file_end:
             self.copyRest(ipos)
 
@@ -432,39 +441,57 @@
         # After the pack time, all data records are copied.
         # Copy one txn at a time, using copy() for data.
 
-        while ipos < self.file_end:
-            th = self._read_txn_header(ipos)
-            pos = self._tfile.tell()
-            self._copier.setTxnPos(pos)
-            self._tfile.write(th.asString())
-            tend = ipos + th.tlen
-            ipos += th.headerlen()
-
-            while ipos < tend:
-                h = self._read_data_header(ipos)
-                ipos += h.recordlen()
-                if h.nrefs:
-                    refs = splitrefs(self._file.read(h.nrefs * 8))
-                else:
-                    refs = []
-                prev_txn = None
-                if h.plen:
-                    data = self._file.read(h.plen)
-                else:
-                    data, refs = self.fetchBackpointer(h.oid, h.back)
-                    if h.back:
-                        prev_txn = self.getTxnFromData(h.oid, h.back)
-
-                self._copier.copy(h.oid, h.serial, data, refs, h.version,
-                                  prev_txn, pos, self._tfile.tell())
-
-            tlen = self._tfile.tell() - pos
-            assert tlen == th.tlen
-            self._tfile.write(p64(tlen))
-            ipos += 8
-
-            self.index.update(self.tindex)
-            self.tindex.clear()
-            self.vindex.update(self.tvindex)
-            self.tvindex.clear()
+        # XXX Release the lock every N txns to let someone
+        # waiting to write a new txn have a go.
+
+
+        try:
+            while 1:
+                ipos = self.copyOne(ipos)
+        except CorruptedDataError, err:
+            # The last call to copyOne() will raise
+            # CorruptedDataError, because it will attempt to read past
+            # the end of the file.  Double-check that the exception
+            # occurred for this reason.
+            self._file.seek(0, 2)
+            endpos = self._file.tell()
+            if endpos != err.pos:
+                raise
+
+    def copyOne(self, ipos):
+        th = self._read_txn_header(ipos)
+        pos = self._tfile.tell()
+        self._copier.setTxnPos(pos)
+        self._tfile.write(th.asString())
+        tend = ipos + th.tlen
+        ipos += th.headerlen()
+
+        while ipos < tend:
+            h = self._read_data_header(ipos)
+            ipos += h.recordlen()
+            if h.nrefs:
+                refs = splitrefs(self._file.read(h.nrefs * 8))
+            else:
+                refs = []
+            prev_txn = None
+            if h.plen:
+                data = self._file.read(h.plen)
+            else:
+                data, refs = self.fetchBackpointer(h.oid, h.back)
+                if h.back:
+                    prev_txn = self.getTxnFromData(h.oid, h.back)
+
+            self._copier.copy(h.oid, h.serial, data, refs, h.version,
+                              prev_txn, pos, self._tfile.tell())
+
+        tlen = self._tfile.tell() - pos
+        assert tlen == th.tlen
+        self._tfile.write(p64(tlen))
+        ipos += 8
+
+        self.index.update(self.tindex)
+        self.tindex.clear()
+        self.vindex.update(self.tvindex)
+        self.tvindex.clear()
+        return ipos
 


=== ZODB4/src/zodb/storage/file/main.py 1.3 => 1.4 ===
--- ZODB4/src/zodb/storage/file/main.py:1.3	Thu May  1 15:35:01 2003
+++ ZODB4/src/zodb/storage/file/main.py	Thu May 15 17:00:54 2003
@@ -891,26 +891,30 @@
             opos = p.pack()
             if opos is None:
                 return
+            assert p.locked
             oldpath = self._name + ".old"
-            self._file.close()
+            self._lock_acquire()
             try:
-                if os.path.exists(oldpath):
-                    os.remove(oldpath)
-                os.rename(self._name, oldpath)
-            except Exception, msg:
-                self._file = open(self._name, 'r+b')
-                raise
+                self._file.close()
+                try:
+                    if os.path.exists(oldpath):
+                        os.remove(oldpath)
+                    os.rename(self._name, oldpath)
+                except Exception, msg:
+                    self._file = open(self._name, 'r+b')
+                    raise
 
-            # OK, we're beyond the point of no return
-            os.rename(self._name + '.pack', self._name)
-            self._file = open(self._name, 'r+b')
-            self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
-            self._pos = opos
-            self._save_index()
+                # OK, we're beyond the point of no return
+                os.rename(self._name + '.pack', self._name)
+                self._file = open(self._name, 'r+b')
+                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+                self._pos = opos
+                self._save_index()
+            finally:
+                self._lock_release()
         finally:
             if p.locked:
                 self._commit_lock_release()
-                self._lock_release()
             self._lock_acquire()
             self._packt = ZERO
             self._lock_release()


=== ZODB4/src/zodb/storage/file/format.py 1.3 => 1.4 ===
--- ZODB4/src/zodb/storage/file/format.py:1.3	Thu Apr 24 16:46:48 2003
+++ ZODB4/src/zodb/storage/file/format.py	Thu May 15 17:00:54 2003
@@ -126,7 +126,8 @@
 from zodb.interfaces import ZERO, MAXTID, POSKeyError, _fmt_oid
 from zodb.utils import u64, p64
 from zodb.storage.base import splitrefs
-from zodb.storage.file.errors import CorruptedError, FileStorageFormatError
+from zodb.storage.file.errors \
+     import CorruptedDataError, CorruptedError, FileStorageFormatError
 
 # the struct formats for the headers
 TRANS_HDR = ">8sQcHHH"
@@ -142,7 +143,7 @@
 
 def panic(message, *data):
     logger.critical(message, *data)
-    raise CorruptedTransactionError(message % data)
+    raise CorruptedError(message % data)
 
 class FileStorageFormatter:
     """Mixin class that can read and write the low-level format."""