[Zope-Checkins] CVS: ZODB3/ZODB - FileStorage.py:1.131

Jeremy Hylton jeremy@zope.com
Thu, 1 May 2003 13:10:20 -0400


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10554

Modified Files:
	FileStorage.py 
Log Message:
Use new fspack.


=== ZODB3/ZODB/FileStorage.py 1.130 => 1.131 ===
--- ZODB3/ZODB/FileStorage.py:1.130	Wed Apr 30 15:04:17 2003
+++ ZODB3/ZODB/FileStorage.py	Thu May  1 13:10:19 2003
@@ -135,6 +135,7 @@
 from ZODB.TimeStamp import TimeStamp
 from ZODB.lock_file import LockFile
 from ZODB.utils import p64, u64, cp, z64
+from ZODB.fspack import FileStoragePacker
 
 try:
     from ZODB.fsIndex import fsIndex
@@ -1430,382 +1431,57 @@
         Also, data back pointers that point before packtss are resolved and
         the associated data are copied, since the old records are not copied.
         """
-
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        # Ugh, this seems long
-
-        packing=1 # are we in the packing phase (or the copy phase)
-        locked=0
-        _lock_acquire=self._lock_acquire
-        _lock_release=self._lock_release
-        _commit_lock_acquire=self._commit_lock_acquire
-        _commit_lock_release=self._commit_lock_release
-        index, vindex, tindex, tvindex = self._newIndexes()
-        name=self.__name__
-        file=open(name, 'rb')
+        
         stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
         if stop==z64: raise FileStorageError, 'Invalid pack time'
 
+        # If the storage is empty, there's nothing to do.
+        if not self._index:
+            return
+        
         # Record pack time so we don't undo while packing
-        _lock_acquire()
+        self._lock_acquire()
         try:
             if self._packt != z64:
                 # Already packing.
                 raise FileStorageError, 'Already packing'
-            self._packt = stop
+            self._packt = None
         finally:
-            _lock_release()
+            self._lock_release()
 
+        p = FileStoragePacker(self._file_name, stop,
+                              self._lock_acquire, self._lock_release,
+                              self._commit_lock_acquire,
+                              self._commit_lock_release)
         try:
-            ##################################################################
-            # Step 1, get index as of pack time that
-            # includes only referenced objects.
-
-            packpos, maxoid, ltid = read_index(
-                file, name, index, vindex, tindex, stop,
-                read_only=1,
-                )
-
-            if packpos == 4:
-                return
-            if self._redundant_pack(file, packpos):
+            opos = p.pack()
+            if opos is None:
                 return
-
-            rootl=[z64]
-            pop=rootl.pop
-            pindex=fsIndex()
-            referenced=pindex.has_key
-            _load=self._load
-            _loada=self._loada
-            v=None
-            while rootl:
-                oid=pop()
-                if referenced(oid): continue
-                try:
-                    p, v, nv = _loada(oid, index, file)
-                    referencesf(p, rootl)
-                    if nv:
-                        p, serial = _load(oid, '', index, file)
-                        referencesf(p, rootl)
-
-                    pindex[oid]=index[oid]
-                except KeyError:
-                    pindex[oid]=0
-                    error('Bad reference to %s', `(oid,v)`)
-                    # XXX This try/except frequently masks bugs in the
-                    # implementation.
-
-            ##################################################################
-            # Step 2, copy data and compute new index based on new positions.
-            index, vindex, tindex, tvindex = self._newIndexes()
-
-            ofile=open(name+'.pack', 'w+b')
-
-            # Index for non-version data.  This is a temporary structure
-            # to reduce I/O during packing
-            nvindex=fsIndex()
-
-            # Cache a bunch of methods
-            seek=file.seek
-            read=file.read
-            oseek=ofile.seek
-            write=ofile.write
-
-            index_get=index.get
-            vindex_get=vindex.get
-            pindex_get=pindex.get
-
-            # Initialize,
-            pv=z64
-            offset=0L  # the amount of space freed by packing
-            pos=opos=4L
-            oseek(0)
-            write(packed_version)
-
-            # Copy the data in two stages.  In the packing stage,
-            # we skip records that are non-current or that are for
-            # unreferenced objects. We also skip undone transactions.
-            #
-            # After the packing stage, we copy everything but undone
-            # transactions, however, we have to update various back pointers.
-            # We have to have the storage lock in the second phase to keep
-            # data from being changed while we're copying.
-            pnv=None
-            while 1:
-
-                # Check for end of packed records
-                if packing and pos >= packpos:
-                    # OK, we're done with the old stuff, now we have
-                    # to get the lock so we can copy the new stuff!
-                    offset=pos-opos
-                    if offset <= 0:
-                        # we didn't free any space, there's no point in
-                        # continuing
-                        ofile.close()
-                        file.close()
-                        os.remove(name+'.pack')
-                        return
-
-                    packing=0
-                    _commit_lock_acquire()
-                    _lock_acquire()
-                    locked=1
-                    self._packt=None # Prevent undo until we're done
-
-                # Read the transaction record
-                seek(pos)
-                h=read(TRANS_HDR_LEN)
-                if len(h) < TRANS_HDR_LEN: break
-                tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
-                if status=='c':
-                    # Oops. we found a checkpoint flag.
-                    break
-                tl=u64(stl)
-                tpos=pos
-                tend=tpos+tl
-
-                if status=='u':
-                    if not packing:
-                        # We rely below on a constant offset for unpacked
-                        # records. This assumption holds only if we copy
-                        # undone unpacked data. This is lame, but necessary
-                        # for now to squash a bug.
-                        write(h)
-                        tl=tl+8
-                        write(read(tl-TRANS_HDR_LEN))
-                        opos=opos+tl
-
-                    # Undone transaction, skip it
-                    pos=tend+8
-                    continue
-
-                otpos=opos # start pos of output trans
-
-                # write out the transaction record
-                status=packing and 'p' or ' '
-                write(h[:16]+status+h[17:])
-                thl=ul+dl+el
-                h=read(thl)
-                if len(h) != thl:
-                    raise PackError(opos)
-                write(h)
-                thl=TRANS_HDR_LEN+thl
-                pos=tpos+thl
-                opos=otpos+thl
-
-                while pos < tend:
-                    # Read the data records for this transaction
-
-                    seek(pos)
-                    h=read(DATA_HDR_LEN)
-                    oid,serial,sprev,stloc,vlen,splen = unpack(
-                        DATA_HDR, h)
-                    plen=u64(splen)
-                    dlen=DATA_HDR_LEN+(plen or 8)
-
-                    if vlen:
-                        dlen=dlen+(16+vlen)
-                        if packing and pindex_get(oid, 0) != pos:
-                            # This is not the most current record, or
-                            # the oid is no longer referenced so skip it.
-                            pos=pos+dlen
-                            continue
-
-                        pnv=u64(read(8))
-                        # skip position of previous version record
-                        seek(8,1)
-                        version=read(vlen)
-                        pv=p64(vindex_get(version, 0))
-                        vindex[version]=opos
-                    else:
-                        if packing:
-                            ppos=pindex_get(oid, 0)
-                            if ppos != pos:
-
-                                if not ppos:
-                                    # This object is no longer referenced
-                                    # so skip it.
-                                    pos=pos+dlen
-                                    continue
-
-                                # This is not the most current record
-                                # But maybe it's the most current committed
-                                # record.
-                                seek(ppos)
-                                ph=read(DATA_HDR_LEN)
-                                pdoid,ps,pp,pt,pvlen,pplen = unpack(
-                                    DATA_HDR, ph)
-                                if not pvlen:
-                                    # The most current record is committed, so
-                                    # we can toss this one
-                                    pos=pos+dlen
-                                    continue
-                                pnv=read(8)
-                                pnv=_loadBackPOS(file, oid, pnv)
-                                if pnv > pos:
-                                    # The current non version data is later,
-                                    # so this isn't the current record
-                                    pos=pos+dlen
-                                    continue
-
-                                # Ok, we've gotten this far, so we have
-                                # the current record and we're ready to
-                                # read the pickle, but we're in the wrong
-                                # place, after wandering around to figure
-                                # out is we were current. Seek back
-                                # to pickle data:
-                                seek(pos+DATA_HDR_LEN)
-
-                            nvindex[oid]=opos
-
-                    tindex[oid]=opos
-
-                    opos=opos+dlen
-                    pos=pos+dlen
-
-                    if plen:
-                        p=read(plen)
-                    else:
-                        p=read(8)
-                        if packing:
-                            # When packing we resolve back pointers!
-                            p, serial = _loadBack(file, oid, p)
-                            plen=len(p)
-                            opos=opos+plen-8
-                            splen=p64(plen)
-                        else:
-                            p=u64(p)
-                            if p < packpos:
-                                # We have a backpointer to a
-                                # non-packed record. We have to be
-                                # careful.  If we were pointing to a
-                                # current record, then we should still
-                                # point at one, otherwise, we should
-                                # point at the last non-version record.
-                                ppos=pindex_get(oid, 0)
-                                if ppos:
-                                    if ppos==p:
-                                        # we were pointing to the
-                                        # current record
-                                        p=index[oid]
-                                    else:
-                                        p=nvindex[oid]
-                                else:
-                                    # Oops, this object was modified
-                                    # in a version in which it was deleted.
-                                    # Hee hee. It doesn't matter what we
-                                    # use cause it's not reachable any more.
-                                    p=0
-                            else:
-                                # This points back to a non-packed record.
-                                # Just adjust for the offset
-                                p=p-offset
-                            p=p64(p)
-
-                    sprev=p64(index_get(oid, 0))
-                    write(pack(DATA_HDR,
-                               oid,serial,sprev,p64(otpos),vlen,splen))
-                    if vlen:
-                        if not pnv:
-                            write(z64)
-                        else:
-                            if pnv < packpos:
-                                # we need to point to the packed
-                                # non-version rec
-                                pnv=nvindex[oid]
-                            else:
-                                # we just need to adjust the pointer
-                                # with the offset
-                                pnv=pnv-offset
-
-                            write(p64(pnv))
-                        write(pv)
-                        write(version)
-
-                    write(p)
-
-                # skip the (intentionally redundant) transaction length
-                pos=pos+8
-
-                if locked:
-                    # temporarily release the lock to give other threads
-                    # a chance to do some work!
-                    _commit_lock_release()
-                    _lock_release()
-                    locked=0
-
-                index.update(tindex) # Record the position
-                tindex.clear()
-
-                # Now, maybe we need to hack or delete the transaction
-                otl=opos-otpos
-                if otl != tl:
-                    # Oops, what came out is not what came in!
-
-                    # Check for empty:
-                    if otl==thl:
-                        # Empty, slide back over the header:
-                        opos=otpos
-                        oseek(opos)
-                    else:
-                        # Not empty, but we need to adjust transaction length
-                        # and update the status
-                        oseek(otpos+8)
-                        otl=p64(otl)
-                        write(otl+status)
-                        oseek(opos)
-                        write(otl)
-                        opos=opos+8
-
-                else:
-                    write(p64(otl))
-                    opos=opos+8
-
-
-                if not packing:
-                    # We are in the copying phase. We need to get the lock
-                    # again to avoid someone writing data while we read it.
-                    _commit_lock_acquire()
-                    _lock_acquire()
-                    locked=1
-
-
-            # OK, we've copied everything. Now we need to wrap things
-            # up.
-
-            # Hack the files around.
-            name=self.__name__
-
-            ofile.flush()
-            ofile.close()
-            file.close()
+            oldpath = self._file_name + ".old"
             self._file.close()
             try:
-                if os.path.exists(name+'.old'):
-                    os.remove(name+'.old')
-                os.rename(name, name+'.old')
-            except:
-                # Waaa
-                self._file=open(name,'r+b')
+                if os.path.exists(oldpath):
+                    os.remove(oldpath)
+                os.rename(self._file_name, oldpath)
+            except Exception, msg:
+                self._file = open(self._file_name, 'r+b')
                 raise
 
             # OK, we're beyond the point of no return
-            os.rename(name+'.pack', name)
-            self._file=open(name,'r+b')
-            self._initIndex(index, vindex, tindex, tvindex)
-            self._pos=opos
+            os.rename(self._file_name + '.pack', self._file_name)
+            self._file = open(self._file_name, 'r+b')
+            self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+            self._pos = opos
             self._save_index()
-
         finally:
-
-            if locked:
-                _commit_lock_release()
-                _lock_release()
-
-            _lock_acquire()
-            self._packt=z64
-            _lock_release()
+            if p.locked:
+                self._commit_lock_release()
+                self._lock_release()
+            self._lock_acquire()
+            self._packt = z64
+            self._lock_release()
 
     def iterator(self, start=None, stop=None):
         return FileIterator(self._file_name, start, stop)