[Zodb-checkins] CVS: Zope3/src/zodb/storage - file.py:1.23.2.1

Jeremy Hylton jeremy@zope.com
Thu, 10 Apr 2003 18:41:13 -0400


Update of /cvs-repository/Zope3/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv6526

Modified Files:
      Tag: jeremy-new-pack-branch
	file.py 
Log Message:
Incomplete progress on the new pack for FileStorage.



=== Zope3/src/zodb/storage/file.py 1.23 => 1.23.2.1 ===
--- Zope3/src/zodb/storage/file.py:1.23	Wed Apr  9 14:00:43 2003
+++ Zope3/src/zodb/storage/file.py	Thu Apr 10 18:41:12 2003
@@ -132,6 +132,7 @@
 from __future__ import generators
 
 import os
+import stat
 import sys
 import time
 import errno
@@ -452,8 +453,18 @@
         s = struct.pack(">QQ", pnv, vprev)
         file.write(s + version)
 
-    def _read_txn_header(self, pos):
-        pass
+    def _read_txn_header(self, pos, tid=None):
+        self._file.seek(pos)
+        s = self._file.read(TRANS_HDR_LEN)
+        if len(s) != TRANS_HDR_LEN:
+            raise CorruptedDataError(tid, s, pos)
+        h = TxnHeader.fromString(s)
+        if tid is not None and tid != h.tid:
+            raise CorruptedDataError(tid, s, pos)
+        h.user = self._file.read(h.ulen)
+        h.descr = self._file.read(h.dlen)
+        h.ext = self._file.read(h.elen)
+        return h
 
     def _loadBack_impl(self, oid, back):
         # shared implementation used by various _loadBack methods
@@ -889,11 +900,8 @@
         # Return backpointer to oid in data record for in transaction at tpos.
         # It should contain a pickle identical to data. Returns 0 on failure.
         # Must call with lock held.
-        self._file.seek(tpos)
-        h = self._file.read(TRANS_HDR_LEN)
-        tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
-        self._file.read(ul + dl + el)
-        tend = tpos + tl
+        h = self._read_txn_header(tpos)
+        tend = tpos + h.tlen
         pos = self._file.tell()
         while pos < tend:
             h = self._read_data_header(pos)
@@ -1327,14 +1335,18 @@
         otloc = self._pos
         here = self._pos + self._tfile.tell() + self._thl
         # Let's move the file pointer back to the start of the txn record.
-        self._file.seek(tpos)
-        h = self._file.read(TRANS_HDR_LEN)
-        if h[16] != ' ':
+        th = self._read_txn_header(tpos)
+##        self._file.seek(tpos)
+##        h = self._file.read(TRANS_HDR_LEN)
+##        if h[16] != ' ':
+##            raise UndoError(None, "non-undoable transaction")
+##        tl = u64(h[8:16])
+##        ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
+        if th.status != " ":
             raise UndoError(None, "non-undoable transaction")
-        tl = u64(h[8:16])
-        ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
-        tend = tpos + tl
-        pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
+            
+        tend = tpos + th.tlen
+        pos = tpos + th.headerlen()
         tindex = {}
 
         # keep track of failures, cause we may succeed later
@@ -1523,6 +1535,21 @@
         self._commit_lock_acquire = cla
         self._commit_lock_release = clr
 
+        # The packer will use several indexes.
+        # index: oid -> pos
+        # vindex: version -> pos of XXX
+        # tindex: oid -> pos, for current txn
+        # tvindex: version -> pos of XXX, for current txn
+        
+        self.index = fsIndex()
+        self.vindex = {}
+        self.tindex = {}
+        self.tvindex = {}
+
+        # Index for non-version data.  This is a temporary structure
+        # to reduce I/O during packing
+        self.nvindex = fsIndex()
+
     def _loada(self, oid, _index):
         """Read any version and return the version"""
         try:
@@ -1546,55 +1573,166 @@
         return self._file.read(1) != ' ' # XXX or == "p"?
 
     def _pack_index(self, index):
-        # Return an index for everything reachable at the pack time.
+        """Return packpos and an index of objects reachable at the pack time.
+        
+        If the storage is empty or it has been packed to a later time,
+        return None.
+        """
+        # index is a complete index as of the pack time.
+        index = fsIndex()
+        vindex = {}
+        tindex = {}
+        packpos, maxoid, ltid = self._read_index(index, vindex, tindex,
+                                                 self._stop, read_only=1)
+        if packpos == self._metadata_size:
+            return None, None
+        if self._redundant_pack(packpos):
+            return None, None
+        del vindex, tindex
+
+        # Now traverse all objects starting from the root and add
+        # them to pindex.  Any object in index but not in pindex
+        # is unreachable and should not be copied.
         rootl = [ZERO]
         pindex = fsIndex()
         while rootl:
             oid = rootl.pop()
             if oid in pindex:
                 continue
-            # XXX We might need to catch exceptions raised by _loada().
             p, refs, v = self._loada(oid, index)
             rootl.extend(refs)
             pindex[oid] = index[oid]
-        return pindex
+        return packpos, pindex
 
     def pack(self):
+        # Pack copies all data reachable at the pack time or later.
+        #
+        # Copying occurs in two phases.  In the first phase, txns
+        # before the pack time are copied if the contain any reachable
+        # data.  In the second phase, all txns after the pack time
+        # are copied.
+        #
+        # Txn and data records contain pointers to previous records.
+        # Because these pointers are stored as file offsets, they
+        # must be updated when we copy data.
+        
         packing = True
+        file_end = os.stat(self._file.name)[stat.ST_SIZE]
+        packpos, pindex = self._pack_index()
+        assert packpos <= file_end
 
-        ##################################################################
-        # Step 1, get index as of pack time that has only referenced objects.
-        index = fsIndex()
-        vindex = {}
-        # tindex is the index for the current transaction
-        tindex = {}
-        tvindex = {}
-        packpos, maxoid, ltid = self._read_index(index, vindex, tindex,
-                                                 self._stop, read_only=1)
-        if packpos == self._metadata_size:
-            return
-        if self._redundant_pack(packpos):
-            return
-        pindex = self._pack_index(index)
+        # Setup the destination file and copy the metadata.
+        self.ofile = open(self._name + ".pack", "w+b")
+        self._file.seek(0)
+        self.ofile.write(self._file.read(self._metadata_size))
 
-        ##################################################################
-        # Step 2, copy data and compute new index based on new positions.
-        index = fsIndex()
-        vindex = {}
-        tindex = {}
-        tvindex = {}
+        self.copyToPacktime(packpos)
+        self.copyRest()
 
-        # Index for non-version data.  This is a temporary structure
-        # to reduce I/O during packing
-        nvindex = fsIndex()
+        # OK, we've copied everything. Now we need to wrap things up.
+        self.ofile.flush()
+        self.ofile.close()
+        self._file.close()
+
+        # XXX probably don't need to return indexes, since caller
+        # has object
+        return opos, self.index, self.vindex, self.tindex, self.tvindex
+
+    def isCurNonversion(self, h, nvpos, curpos):
+        """Return True if h is current non-version data,
+
+        where the current data is at curpos.  Always restores the file
+        to the position it was at before the call.
+        """
+        pos = self._file.tell()
+        try:
+            # XXX probably don't want to read this data everytime.
+            # instead, keep a cache of whether curpos is in a version.
+            # note that entries can be thrown out after passing curpos.
+            cur = self._read_data_header(curpos)
+            assert cur.oid == h.oid
+            if not cur.version:
+                return False
+            # If cur has a pnv, we need to chase backpointers until
+            # we get to the actual data.
+            self._file.read(cur.nrefs * 8)
+            pnv = self._loadBackPOS(cur.oid, cur.pnv)
+            return pnv == nvpos
+        finally:
+            self._file.seek(pos)
 
-        ofile = open(self._name + '.pack', 'w+b')
-        pv = ZERO
+    def copyToPacktime(self, packpos):
         offset = 0L  # the amount of space freed by packing
-        pos = opos = self._metadata_size
-        # Copy the metadata from the old file to the new one.
-        self._file.seek(0)
-        ofile.write(self._file.read(self._metadata_size))
+        pos = self._metadata_size
+        new_pos = pos
+
+        while pos <= packpos:
+            th = self._read_txn_header(pos)
+            copy = False
+            tend = pos + th.tlen
+            pos += th.headerlen()
+            while pos < tend:
+                h = self._read_data_header(pos)
+                cur_nonversion = False
+                
+                # If this data record isn't current, don't copy it.
+                # If the current record is in a version, this may be
+                # the current non-version data.
+                curpos = pindex.get(h.oid, 0)
+                if curpos != pos:
+                    if self.isCurNonversion(h, curpos):
+                        cur_nonversion = True
+                    else:
+                        pos += h.recordlen()
+                        continue
+
+                # If we are going to copy any data, we need to copy
+                # the transaction header.  Note that we will need to
+                # patch up the transaction length when we are done.
+                if not copy:
+                    th.status = "p"
+                    s = th.asString()
+                    self.ofile.write(s)
+                    new_tpos = new_pos
+                    new_pos += len(s)
+                    copy = True
+
+                if cur_nonversion:
+                    self.nvindex[h.oid] = new_pos
+
+                if h.plen:
+                    refs = self._file.read(8 * h.nrefs)
+                    data = self._file.read(h.plen)
+                else:
+                    # If a current record has a backpointer, fetch
+                    # refs and data from the backpointer.  We need
+                    # to write the data in the new record.
+                    data, refs, serial, tid = self._loadBackTxn(h.oid, h.back)
+
+                self.writeDataRecord(h, data, refs, new_pos)
+                new_pos = self.ofile.tell()
+
+            if copy:
+                # now fix up the transaction header
+                pass
+        
+
+    def writeDataRecord(self, h, data, refs, new_pos):
+        # Update the header to reflect current information, then write
+        # it to the output file.
+        h.prev = 0
+        h.back = 0
+        h.plen = len(data)
+        h.nrefs = len(refs) / 8
+        h.tloc = new_pos
+        if h.version:
+            h.pnv = self.nvindex[h.oid]
+            h.vprev = 0
+        self.ofile.write(h.asString())
+        self.ofile.write(refs)
+        self.ofile.write(data)
+
+    def XXX(self):
 
         # Copy the data in two stages.  In the packing stage, we skip
         # records that are non-current or that are for unreferenced
@@ -1624,28 +1762,22 @@
                 self.locked = True
 
             # Read the transaction record
-            self._file.seek(pos)
-            h = self._file.read(TRANS_HDR_LEN)
-            if len(h) < TRANS_HDR_LEN:
+            if pos == file_end:
                 break
-            tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
-            if status == 'c':
+            th = self._read_txn_header(pos)
+            if th.status == 'c':
                 # Oops. we found a checkpoint flag.
                 break
             tpos = pos
-            tend = tpos + tl
+            tend = tpos + th.tlen
 
             otpos = opos # start pos of output trans
 
             # write out the transaction record
             status = packing and 'p' or ' '
-            ofile.write(h[:16] + status + h[17:])
-            thl = ul + dl + el
-            h = self._file.read(thl)
-            if len(h) != thl:
-                raise PackError(opos)
-            ofile.write(h)
-            thl = TRANS_HDR_LEN + thl
+            th.status = status
+            ofile.write(th.asString())
+            thl = th.headerlen()
             pos = tpos + thl
             opos = otpos + thl
 
@@ -1780,7 +1912,7 @@
 
             # Now, maybe we need to hack or delete the transaction
             otl = opos - otpos
-            if otl != tl:
+            if otl != th.tlen:
                 # Oops, what came out is not what came in!
 
                 # Check for empty:
@@ -2139,6 +2271,15 @@
 
     fromString = classmethod(fromString)
 
+    def asString(self):
+        s = struct.pack(DATA_HDR, self.oid, self.serial, self.tloc,
+                        self.vlen, self.plen, self.nrefs, self.back)
+        if self.version:
+            v = struct.pack(">QQ", self.pnv, self.vprev)
+            return s + v + self.version
+        else:
+            return s
+
     def parseVersion(self, buf):
         self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
         self.version = buf[16:]
@@ -2148,6 +2289,33 @@
         if self.version:
             rlen += 16 + self.vlen
         return rlen
+
+class TxnHeader:
+    """Header for a transaction record."""
+
+    __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
+                 "ulen", "dlen", "elen")
+
+    def __init__(self, tid, tlen, status, ulen, dlen, elen):
+        self.tid = tid
+        self.tlen = tlen
+        self.status = status
+        self.ulen = ulen
+        self.dlen = dlen
+        self.elen = elen
+
+    def fromString(cls, s):
+        return cls(*struct.unpack(TRANS_HDR, s))
+
+    fromString = classmethod(fromString)
+
+    def asString(self):
+        s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
+                        self.ulen, self.dlen, self.elen)
+        return "".join([s, self.user, self.descr, self.ext])
+
+    def headerlen(self):
+        return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
 
 
 def cleanup(filename):