[Zodb-checkins] CVS: Zope/lib/python/ZODB - fspack.py:1.5.4.1
FileStorage.py:1.98.2.9
Jeremy Hylton
jeremy at zope.com
Tue May 13 15:04:52 EDT 2003
Update of /cvs-repository/Zope/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv20467/lib/python/ZODB
Modified Files:
Tag: Zope-2_6-branch
FileStorage.py
Added Files:
Tag: Zope-2_6-branch
fspack.py
Log Message:
Backport new pack implementation and MTStorage fix from ZODB3-3_1-branch.
=== Added File Zope/lib/python/ZODB/fspack.py === (729/829 lines abridged)
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""FileStorage helper to perform pack.
A storage contains an ordered set of object revisions. When a storage
is packed, object revisions that are not reachable as of the pack time
are deleted. The notion of reachability is complicated by
backpointers -- object revisions that point to earlier revisions of
the same object.
An object revisions is reachable at a certain time if it is reachable
from the revision of the root at that time or if it is reachable from
a backpointer after that time.
"""
# This module contains code backported from ZODB4 from the
# zodb.storage.file package. It's been edited heavily to work with
# ZODB3 code and storage layout.
import os
import struct
from types import StringType
from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try:
from ZODB.fsIndex import fsIndex
except ImportError:
def fsIndex():
return {}
class CorruptedError(Exception):
pass
z64='\0'*8
# the struct formats for the headers
TRANS_HDR = ">8s8scHHH"
[-=- -=- -=- 729 lines omitted -=- -=- -=-]
if h.version:
h.pnv = self.index.get(h.oid, 0)
h.vprev = self.vindex.get(h.version, 0)
self.vindex[h.version] = pos
self.index[h.oid] = pos
if h.version:
self.vindex[h.version] = pos
self._tfile.write(h.asString())
self._tfile.write(data)
if not data:
# Packed records never have backpointers (?).
# If there is no data, write a z64 backpointer.
# This is a George Bailey event.
self._tfile.write(z64)
def copyRest(self, ipos):
# After the pack time, all data records are copied.
# Copy one txn at a time, using copy() for data.
while ipos < self.file_end:
th = self._read_txn_header(ipos)
pos = self._tfile.tell()
self._copier.setTxnPos(pos)
self._tfile.write(th.asString())
tend = ipos + th.tlen
ipos += th.headerlen()
while ipos < tend:
h = self._read_data_header(ipos)
ipos += h.recordlen()
prev_txn = None
if h.plen:
data = self._file.read(h.plen)
else:
data = self.fetchBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
self._copier.copy(h.oid, h.serial, data, h.version,
prev_txn, pos, self._tfile.tell())
tlen = self._tfile.tell() - pos
assert tlen == th.tlen
self._tfile.write(p64(tlen))
ipos += 8
self.index.update(self.tindex)
self.tindex.clear()
self.vindex.update(self.tvindex)
self.tvindex.clear()
=== Zope/lib/python/ZODB/FileStorage.py 1.98.2.8 => 1.98.2.9 ===
--- Zope/lib/python/ZODB/FileStorage.py:1.98.2.8 Wed Apr 30 15:10:02 2003
+++ Zope/lib/python/ZODB/FileStorage.py Tue May 13 14:04:21 2003
@@ -135,6 +135,7 @@
from ZODB.TimeStamp import TimeStamp
from ZODB.lock_file import lock_file
from ZODB.utils import t32, p64, U64, cp
+from ZODB.fspack import FileStoragePacker
try:
from ZODB.fsIndex import fsIndex
@@ -1474,384 +1475,57 @@
Also, data back pointers that point before packtss are resolved and
the associated data are copied, since the old records are not copied.
"""
-
if self._is_read_only:
raise POSException.ReadOnlyError()
- # Ugh, this seems long
-
- packing=1 # are we in the packing phase (or the copy phase)
- locked=0
- _lock_acquire=self._lock_acquire
- _lock_release=self._lock_release
- _commit_lock_acquire=self._commit_lock_acquire
- _commit_lock_release=self._commit_lock_release
- index, vindex, tindex, tvindex = self._newIndexes()
- name=self.__name__
- file=open(name, 'rb')
+
stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
if stop==z64: raise FileStorageError, 'Invalid pack time'
+ # If the storage is empty, there's nothing to do.
+ if not self._index:
+ return
+
# Record pack time so we don't undo while packing
- _lock_acquire()
+ self._lock_acquire()
try:
if self._packt != z64:
# Already packing.
raise FileStorageError, 'Already packing'
- self._packt = stop
+ self._packt = None
finally:
- _lock_release()
+ self._lock_release()
+ p = FileStoragePacker(self._file_name, stop,
+ self._lock_acquire, self._lock_release,
+ self._commit_lock_acquire,
+ self._commit_lock_release)
try:
- ##################################################################
- # Step 1, get index as of pack time that
- # includes only referenced objects.
-
- packpos, maxoid, ltid = read_index(
- file, name, index, vindex, tindex, stop,
- read_only=1,
- )
-
- if packpos == 4:
+ opos = p.pack()
+ if opos is None:
return
- if self._redundant_pack(file, packpos):
- raise FileStorageError, (
- 'The database has already been packed to a later time\n'
- 'or no changes have been made since the last pack')
-
- rootl=[z64]
- pop=rootl.pop
- pindex=fsIndex()
- referenced=pindex.has_key
- _load=self._load
- _loada=self._loada
- v=None
- while rootl:
- oid=pop()
- if referenced(oid): continue
- try:
- p, v, nv = _loada(oid, index, file)
- referencesf(p, rootl)
- if nv:
- p, serial = _load(oid, '', index, file)
- referencesf(p, rootl)
-
- pindex[oid]=index[oid]
- except:
- pindex[oid]=0
- error('Bad reference to %s', `(oid,v)`)
-
- spackpos=p64(packpos)
-
- ##################################################################
- # Step 2, copy data and compute new index based on new positions.
- index, vindex, tindex, tvindex = self._newIndexes()
-
- ofile=open(name+'.pack', 'w+b')
-
- # Index for non-version data. This is a temporary structure
- # to reduce I/O during packing
- nvindex=fsIndex()
-
- # Cache a bunch of methods
- seek=file.seek
- read=file.read
- oseek=ofile.seek
- write=ofile.write
-
- index_get=index.get
- vindex_get=vindex.get
- pindex_get=pindex.get
-
- # Initialize,
- pv=z64
- offset=0L # the amount of space freed by packing
- pos=opos=4L
- oseek(0)
- write(packed_version)
-
- # Copy the data in two stages. In the packing stage,
- # we skip records that are non-current or that are for
- # unreferenced objects. We also skip undone transactions.
- #
- # After the packing stage, we copy everything but undone
- # transactions, however, we have to update various back pointers.
- # We have to have the storage lock in the second phase to keep
- # data from being changed while we're copying.
- pnv=None
- while 1:
-
- # Check for end of packed records
- if packing and pos >= packpos:
- # OK, we're done with the old stuff, now we have
- # to get the lock so we can copy the new stuff!
- offset=pos-opos
- if offset <= 0:
- # we didn't free any space, there's no point in
- # continuing
- ofile.close()
- file.close()
- os.remove(name+'.pack')
- return
-
- packing=0
- _commit_lock_acquire()
- _lock_acquire()
- locked=1
- self._packt=None # Prevent undo until we're done
-
- # Read the transaction record
- seek(pos)
- h=read(TRANS_HDR_LEN)
- if len(h) < TRANS_HDR_LEN: break
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
- if status=='c':
- # Oops. we found a checkpoint flag.
- break
- tl=U64(stl)
- tpos=pos
- tend=tpos+tl
-
- if status=='u':
- if not packing:
- # We rely below on a constant offset for unpacked
- # records. This assumption holds only if we copy
- # undone unpacked data. This is lame, but necessary
- # for now to squash a bug.
- write(h)
- tl=tl+8
- write(read(tl-TRANS_HDR_LEN))
- opos=opos+tl
-
- # Undone transaction, skip it
- pos=tend+8
- continue
-
- otpos=opos # start pos of output trans
-
- # write out the transaction record
- status=packing and 'p' or ' '
- write(h[:16]+status+h[17:])
- thl=ul+dl+el
- h=read(thl)
- if len(h) != thl:
- raise 'Pack Error', opos
- write(h)
- thl=TRANS_HDR_LEN+thl
- pos=tpos+thl
- opos=otpos+thl
-
- while pos < tend:
- # Read the data records for this transaction
-
- seek(pos)
- h=read(DATA_HDR_LEN)
- oid,serial,sprev,stloc,vlen,splen = unpack(
- DATA_HDR, h)
- plen=U64(splen)
- dlen=DATA_HDR_LEN+(plen or 8)
-
- if vlen:
- dlen=dlen+(16+vlen)
- if packing and pindex_get(oid, 0) != pos:
- # This is not the most current record, or
- # the oid is no longer referenced so skip it.
- pos=pos+dlen
- continue
-
- pnv=U64(read(8))
- # skip position of previous version record
- seek(8,1)
- version=read(vlen)
- pv=p64(vindex_get(version, 0))
- vindex[version]=opos
- else:
- if packing:
- ppos=pindex_get(oid, 0)
- if ppos != pos:
-
- if not ppos:
- # This object is no longer referenced
- # so skip it.
- pos=pos+dlen
- continue
-
- # This is not the most current record
- # But maybe it's the most current committed
- # record.
- seek(ppos)
- ph=read(DATA_HDR_LEN)
- pdoid,ps,pp,pt,pvlen,pplen = unpack(
- DATA_HDR, ph)
- if not pvlen:
- # The most current record is committed, so
- # we can toss this one
- pos=pos+dlen
- continue
- pnv=read(8)
- pnv=_loadBackPOS(file, oid, pnv)
- if pnv > pos:
- # The current non version data is later,
- # so this isn't the current record
- pos=pos+dlen
- continue
-
- # Ok, we've gotten this far, so we have
- # the current record and we're ready to
- # read the pickle, but we're in the wrong
- # place, after wandering around to figure
- # out is we were current. Seek back
- # to pickle data:
- seek(pos+DATA_HDR_LEN)
-
- nvindex[oid]=opos
-
- tindex[oid]=opos
-
- opos=opos+dlen
- pos=pos+dlen
-
- if plen:
- p=read(plen)
- else:
- p=read(8)
- if packing:
- # When packing we resolve back pointers!
- p, serial = _loadBack(file, oid, p)
- plen=len(p)
- opos=opos+plen-8
- splen=p64(plen)
- else:
- p=U64(p)
- if p < packpos:
- # We have a backpointer to a
- # non-packed record. We have to be
- # careful. If we were pointing to a
- # current record, then we should still
- # point at one, otherwise, we should
- # point at the last non-version record.
- ppos=pindex_get(oid, 0)
- if ppos:
- if ppos==p:
- # we were pointing to the
- # current record
- p=index[oid]
- else:
- p=nvindex[oid]
- else:
- # Oops, this object was modified
- # in a version in which it was deleted.
- # Hee hee. It doesn't matter what we
- # use cause it's not reachable any more.
- p=0
- else:
- # This points back to a non-packed record.
- # Just adjust for the offset
- p=p-offset
- p=p64(p)
-
- sprev=p64(index_get(oid, 0))
- write(pack(DATA_HDR,
- oid,serial,sprev,p64(otpos),vlen,splen))
- if vlen:
- if not pnv:
- write(z64)
- else:
- if pnv < packpos:
- # we need to point to the packed
- # non-version rec
- pnv=nvindex[oid]
- else:
- # we just need to adjust the pointer
- # with the offset
- pnv=pnv-offset
-
- write(p64(pnv))
- write(pv)
- write(version)
-
- write(p)
-
- # skip the (intentionally redundant) transaction length
- pos=pos+8
-
- if locked:
- # temporarily release the lock to give other threads
- # a chance to do some work!
- _commit_lock_release()
- _lock_release()
- locked=0
-
- index.update(tindex) # Record the position
- tindex.clear()
-
- # Now, maybe we need to hack or delete the transaction
- otl=opos-otpos
- if otl != tl:
- # Oops, what came out is not what came in!
-
- # Check for empty:
- if otl==thl:
- # Empty, slide back over the header:
- opos=otpos
- oseek(opos)
- else:
- # Not empty, but we need to adjust transaction length
- # and update the status
- oseek(otpos+8)
- otl=p64(otl)
- write(otl+status)
- oseek(opos)
- write(otl)
- opos=opos+8
-
- else:
- write(p64(otl))
- opos=opos+8
-
-
- if not packing:
- # We are in the copying phase. We need to get the lock
- # again to avoid someone writing data while we read it.
- _commit_lock_acquire()
- _lock_acquire()
- locked=1
-
-
- # OK, we've copied everything. Now we need to wrap things
- # up.
-
- # Hack the files around.
- name=self.__name__
-
- ofile.flush()
- ofile.close()
- file.close()
+ oldpath = self._file_name + ".old"
self._file.close()
try:
- if os.path.exists(name+'.old'):
- os.remove(name+'.old')
- os.rename(name, name+'.old')
- except:
- # Waaa
- self._file=open(name,'r+b')
+ if os.path.exists(oldpath):
+ os.remove(oldpath)
+ os.rename(self._file_name, oldpath)
+ except Exception, msg:
+ self._file = open(self._file_name, 'r+b')
raise
# OK, we're beyond the point of no return
- os.rename(name+'.pack', name)
- self._file=open(name,'r+b')
- self._initIndex(index, vindex, tindex, tvindex)
- self._pos=opos
+ os.rename(self._file_name + '.pack', self._file_name)
+ self._file = open(self._file_name, 'r+b')
+ self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+ self._pos = opos
self._save_index()
-
finally:
-
- if locked:
- _commit_lock_release()
- _lock_release()
-
- _lock_acquire()
- self._packt=z64
- _lock_release()
+ if p.locked:
+ self._commit_lock_release()
+ self._lock_release()
+ self._lock_acquire()
+ self._packt = z64
+ self._lock_release()
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)
More information about the Zodb-checkins
mailing list