[Zope-Checkins] CVS: ZODB3/ZODB - FileStorage.py:1.131
Jeremy Hylton
jeremy@zope.com
Thu, 1 May 2003 13:10:20 -0400
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10554
Modified Files:
FileStorage.py
Log Message:
Use new fspack.
=== ZODB3/ZODB/FileStorage.py 1.130 => 1.131 ===
--- ZODB3/ZODB/FileStorage.py:1.130 Wed Apr 30 15:04:17 2003
+++ ZODB3/ZODB/FileStorage.py Thu May 1 13:10:19 2003
@@ -135,6 +135,7 @@
from ZODB.TimeStamp import TimeStamp
from ZODB.lock_file import LockFile
from ZODB.utils import p64, u64, cp, z64
+from ZODB.fspack import FileStoragePacker
try:
from ZODB.fsIndex import fsIndex
@@ -1430,382 +1431,57 @@
Also, data back pointers that point before packtss are resolved and
the associated data are copied, since the old records are not copied.
"""
-
if self._is_read_only:
raise POSException.ReadOnlyError()
- # Ugh, this seems long
-
- packing=1 # are we in the packing phase (or the copy phase)
- locked=0
- _lock_acquire=self._lock_acquire
- _lock_release=self._lock_release
- _commit_lock_acquire=self._commit_lock_acquire
- _commit_lock_release=self._commit_lock_release
- index, vindex, tindex, tvindex = self._newIndexes()
- name=self.__name__
- file=open(name, 'rb')
+
stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
if stop==z64: raise FileStorageError, 'Invalid pack time'
+ # If the storage is empty, there's nothing to do.
+ if not self._index:
+ return
+
# Record pack time so we don't undo while packing
- _lock_acquire()
+ self._lock_acquire()
try:
if self._packt != z64:
# Already packing.
raise FileStorageError, 'Already packing'
- self._packt = stop
+ self._packt = None
finally:
- _lock_release()
+ self._lock_release()
+ p = FileStoragePacker(self._file_name, stop,
+ self._lock_acquire, self._lock_release,
+ self._commit_lock_acquire,
+ self._commit_lock_release)
try:
- ##################################################################
- # Step 1, get index as of pack time that
- # includes only referenced objects.
-
- packpos, maxoid, ltid = read_index(
- file, name, index, vindex, tindex, stop,
- read_only=1,
- )
-
- if packpos == 4:
- return
- if self._redundant_pack(file, packpos):
+ opos = p.pack()
+ if opos is None:
return
-
- rootl=[z64]
- pop=rootl.pop
- pindex=fsIndex()
- referenced=pindex.has_key
- _load=self._load
- _loada=self._loada
- v=None
- while rootl:
- oid=pop()
- if referenced(oid): continue
- try:
- p, v, nv = _loada(oid, index, file)
- referencesf(p, rootl)
- if nv:
- p, serial = _load(oid, '', index, file)
- referencesf(p, rootl)
-
- pindex[oid]=index[oid]
- except KeyError:
- pindex[oid]=0
- error('Bad reference to %s', `(oid,v)`)
- # XXX This try/except frequently masks bugs in the
- # implementation.
-
- ##################################################################
- # Step 2, copy data and compute new index based on new positions.
- index, vindex, tindex, tvindex = self._newIndexes()
-
- ofile=open(name+'.pack', 'w+b')
-
- # Index for non-version data. This is a temporary structure
- # to reduce I/O during packing
- nvindex=fsIndex()
-
- # Cache a bunch of methods
- seek=file.seek
- read=file.read
- oseek=ofile.seek
- write=ofile.write
-
- index_get=index.get
- vindex_get=vindex.get
- pindex_get=pindex.get
-
- # Initialize,
- pv=z64
- offset=0L # the amount of space freed by packing
- pos=opos=4L
- oseek(0)
- write(packed_version)
-
- # Copy the data in two stages. In the packing stage,
- # we skip records that are non-current or that are for
- # unreferenced objects. We also skip undone transactions.
- #
- # After the packing stage, we copy everything but undone
- # transactions, however, we have to update various back pointers.
- # We have to have the storage lock in the second phase to keep
- # data from being changed while we're copying.
- pnv=None
- while 1:
-
- # Check for end of packed records
- if packing and pos >= packpos:
- # OK, we're done with the old stuff, now we have
- # to get the lock so we can copy the new stuff!
- offset=pos-opos
- if offset <= 0:
- # we didn't free any space, there's no point in
- # continuing
- ofile.close()
- file.close()
- os.remove(name+'.pack')
- return
-
- packing=0
- _commit_lock_acquire()
- _lock_acquire()
- locked=1
- self._packt=None # Prevent undo until we're done
-
- # Read the transaction record
- seek(pos)
- h=read(TRANS_HDR_LEN)
- if len(h) < TRANS_HDR_LEN: break
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
- if status=='c':
- # Oops. we found a checkpoint flag.
- break
- tl=u64(stl)
- tpos=pos
- tend=tpos+tl
-
- if status=='u':
- if not packing:
- # We rely below on a constant offset for unpacked
- # records. This assumption holds only if we copy
- # undone unpacked data. This is lame, but necessary
- # for now to squash a bug.
- write(h)
- tl=tl+8
- write(read(tl-TRANS_HDR_LEN))
- opos=opos+tl
-
- # Undone transaction, skip it
- pos=tend+8
- continue
-
- otpos=opos # start pos of output trans
-
- # write out the transaction record
- status=packing and 'p' or ' '
- write(h[:16]+status+h[17:])
- thl=ul+dl+el
- h=read(thl)
- if len(h) != thl:
- raise PackError(opos)
- write(h)
- thl=TRANS_HDR_LEN+thl
- pos=tpos+thl
- opos=otpos+thl
-
- while pos < tend:
- # Read the data records for this transaction
-
- seek(pos)
- h=read(DATA_HDR_LEN)
- oid,serial,sprev,stloc,vlen,splen = unpack(
- DATA_HDR, h)
- plen=u64(splen)
- dlen=DATA_HDR_LEN+(plen or 8)
-
- if vlen:
- dlen=dlen+(16+vlen)
- if packing and pindex_get(oid, 0) != pos:
- # This is not the most current record, or
- # the oid is no longer referenced so skip it.
- pos=pos+dlen
- continue
-
- pnv=u64(read(8))
- # skip position of previous version record
- seek(8,1)
- version=read(vlen)
- pv=p64(vindex_get(version, 0))
- vindex[version]=opos
- else:
- if packing:
- ppos=pindex_get(oid, 0)
- if ppos != pos:
-
- if not ppos:
- # This object is no longer referenced
- # so skip it.
- pos=pos+dlen
- continue
-
- # This is not the most current record
- # But maybe it's the most current committed
- # record.
- seek(ppos)
- ph=read(DATA_HDR_LEN)
- pdoid,ps,pp,pt,pvlen,pplen = unpack(
- DATA_HDR, ph)
- if not pvlen:
- # The most current record is committed, so
- # we can toss this one
- pos=pos+dlen
- continue
- pnv=read(8)
- pnv=_loadBackPOS(file, oid, pnv)
- if pnv > pos:
- # The current non version data is later,
- # so this isn't the current record
- pos=pos+dlen
- continue
-
- # Ok, we've gotten this far, so we have
- # the current record and we're ready to
- # read the pickle, but we're in the wrong
- # place, after wandering around to figure
- # out is we were current. Seek back
- # to pickle data:
- seek(pos+DATA_HDR_LEN)
-
- nvindex[oid]=opos
-
- tindex[oid]=opos
-
- opos=opos+dlen
- pos=pos+dlen
-
- if plen:
- p=read(plen)
- else:
- p=read(8)
- if packing:
- # When packing we resolve back pointers!
- p, serial = _loadBack(file, oid, p)
- plen=len(p)
- opos=opos+plen-8
- splen=p64(plen)
- else:
- p=u64(p)
- if p < packpos:
- # We have a backpointer to a
- # non-packed record. We have to be
- # careful. If we were pointing to a
- # current record, then we should still
- # point at one, otherwise, we should
- # point at the last non-version record.
- ppos=pindex_get(oid, 0)
- if ppos:
- if ppos==p:
- # we were pointing to the
- # current record
- p=index[oid]
- else:
- p=nvindex[oid]
- else:
- # Oops, this object was modified
- # in a version in which it was deleted.
- # Hee hee. It doesn't matter what we
- # use cause it's not reachable any more.
- p=0
- else:
- # This points back to a non-packed record.
- # Just adjust for the offset
- p=p-offset
- p=p64(p)
-
- sprev=p64(index_get(oid, 0))
- write(pack(DATA_HDR,
- oid,serial,sprev,p64(otpos),vlen,splen))
- if vlen:
- if not pnv:
- write(z64)
- else:
- if pnv < packpos:
- # we need to point to the packed
- # non-version rec
- pnv=nvindex[oid]
- else:
- # we just need to adjust the pointer
- # with the offset
- pnv=pnv-offset
-
- write(p64(pnv))
- write(pv)
- write(version)
-
- write(p)
-
- # skip the (intentionally redundant) transaction length
- pos=pos+8
-
- if locked:
- # temporarily release the lock to give other threads
- # a chance to do some work!
- _commit_lock_release()
- _lock_release()
- locked=0
-
- index.update(tindex) # Record the position
- tindex.clear()
-
- # Now, maybe we need to hack or delete the transaction
- otl=opos-otpos
- if otl != tl:
- # Oops, what came out is not what came in!
-
- # Check for empty:
- if otl==thl:
- # Empty, slide back over the header:
- opos=otpos
- oseek(opos)
- else:
- # Not empty, but we need to adjust transaction length
- # and update the status
- oseek(otpos+8)
- otl=p64(otl)
- write(otl+status)
- oseek(opos)
- write(otl)
- opos=opos+8
-
- else:
- write(p64(otl))
- opos=opos+8
-
-
- if not packing:
- # We are in the copying phase. We need to get the lock
- # again to avoid someone writing data while we read it.
- _commit_lock_acquire()
- _lock_acquire()
- locked=1
-
-
- # OK, we've copied everything. Now we need to wrap things
- # up.
-
- # Hack the files around.
- name=self.__name__
-
- ofile.flush()
- ofile.close()
- file.close()
+ oldpath = self._file_name + ".old"
self._file.close()
try:
- if os.path.exists(name+'.old'):
- os.remove(name+'.old')
- os.rename(name, name+'.old')
- except:
- # Waaa
- self._file=open(name,'r+b')
+ if os.path.exists(oldpath):
+ os.remove(oldpath)
+ os.rename(self._file_name, oldpath)
+ except Exception, msg:
+ self._file = open(self._file_name, 'r+b')
raise
# OK, we're beyond the point of no return
- os.rename(name+'.pack', name)
- self._file=open(name,'r+b')
- self._initIndex(index, vindex, tindex, tvindex)
- self._pos=opos
+ os.rename(self._file_name + '.pack', self._file_name)
+ self._file = open(self._file_name, 'r+b')
+ self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+ self._pos = opos
self._save_index()
-
finally:
-
- if locked:
- _commit_lock_release()
- _lock_release()
-
- _lock_acquire()
- self._packt=z64
- _lock_release()
+ if p.locked:
+ self._commit_lock_release()
+ self._lock_release()
+ self._lock_acquire()
+ self._packt = z64
+ self._lock_release()
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)