[Zodb-checkins] CVS: ZODB3/ZODB/FileStorage -
FileStorage.py:1.1.2.2 format.py:1.1.2.3
Jeremy Hylton
jeremy at zope.com
Thu Oct 9 16:26:54 EDT 2003
Update of /cvs-repository/ZODB3/ZODB/FileStorage
In directory cvs.zope.org:/tmp/cvs-serv28627/ZODB/FileStorage
Modified Files:
Tag: ZODB3-mvcc-2-branch
FileStorage.py format.py
Log Message:
Convert large portion of FileStorage to use the format helper methods.
I see a few storage test failures, but those may be the same ones that
I was seeing because of problems in the MVCC implementation.
=== ZODB3/ZODB/FileStorage/FileStorage.py 1.1.2.1 => 1.1.2.2 ===
--- ZODB3/ZODB/FileStorage/FileStorage.py:1.1.2.1 Tue Oct 7 14:16:34 2003
+++ ZODB3/ZODB/FileStorage/FileStorage.py Thu Oct 9 16:26:23 2003
@@ -137,7 +137,10 @@
from ZODB.lock_file import LockFile
from ZODB.utils import p64, u64, cp, z64
from ZODB.fspack import FileStoragePacker
-from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
+from ZODB.FileStorage.format \
+ import FileStorageFormatter, DataHeader, TxnHeader, DATA_HDR, \
+ DATA_HDR_LEN, TRANS_HDR, TRANS_HDR_LEN, CorruptedDataError, \
+ DATA_VERSION_HDR_LEN
try:
from ZODB.fsIndex import fsIndex
@@ -148,16 +151,8 @@
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
t32 = 1L << 32
-# the struct formats for the headers
-TRANS_HDR = ">8s8scHHH"
-DATA_HDR = ">8s8s8s8sH8s"
-# constants to support various header sizes
-TRANS_HDR_LEN = 23
-DATA_HDR_LEN = 42
-DATA_VERSION_HDR_LEN = 58
-assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
-assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
+packed_version = "FS21"
def blather(message, *data):
LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
@@ -199,18 +194,20 @@
class CorruptedTransactionError(CorruptedFileStorageError):
pass
-class CorruptedDataError(CorruptedFileStorageError):
- pass
-
class FileStorageQuotaError(FileStorageError,
POSException.StorageSystemError):
"""File storage quota exceeded."""
-packed_version='FS21'
+class TempFormatter(FileStorageFormatter):
+ """Helper class used to read formatted FileStorage data."""
+
+ def __init__(self, afile):
+ self._file = afile
class FileStorage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage,
FileStorageFormatter):
+
# default pack time is 0
_packt = z64
@@ -234,6 +231,7 @@
# Create the lock file
self._lock_file = LockFile(file_name + '.lock')
self._tfile = open(file_name + '.tmp', 'w+b')
+ self._tfmt = TempFormatter(self._tfile)
else:
self._tfile = None
@@ -395,14 +393,17 @@
with the index. Any invalid record records or inconsistent
object positions cause zero to be returned.
"""
+ r = self._check_sanity(index, pos)
+ if not r:
+ warn("Ignoring index for %s", self._file_name)
+ return r
+
+ def _check_sanity(self, index, pos):
if pos < 100:
return 0 # insane
- file = self._file
- seek = file.seek
- read = file.read
- seek(0,2)
- if file.tell() < pos:
+ self._file.seek(0, 2)
+ if self._file.tell() < pos:
return 0 # insane
ltid = None
@@ -410,51 +411,41 @@
checked = 0
while checked < max_checked:
- seek(pos-8)
- rstl = read(8)
+ self._file.seek(pos - 8)
+ rstl = self._file.read(8)
tl = u64(rstl)
- pos = pos-tl-8
+ pos = pos - tl - 8
if pos < 4:
return 0 # insane
- seek(pos)
- s = read(TRANS_HDR_LEN)
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR, s)
+ h = self._read_txn_header(pos)
if not ltid:
- ltid = tid
- if stl != rstl:
+ ltid = h.tid
+ if h.tlen != tl:
return 0 # inconsistent lengths
- if status == 'u':
+ if h.status == 'u':
continue # undone trans, search back
- if status not in ' p':
+ if h.status not in ' p':
return 0 # insane
- if tl < (TRANS_HDR_LEN + ul + dl + el):
+ if tl < h.headerlen():
return 0 # insane
- tend = pos+tl
- opos = pos+(TRANS_HDR_LEN + ul + dl + el)
+ tend = pos + tl
+ opos = pos + h.headerlen()
if opos == tend:
continue # empty trans
while opos < tend and checked < max_checked:
# Read the data records for this transaction
- seek(opos)
- h = read(DATA_HDR_LEN)
- oid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- tloc = u64(stloc)
- plen = u64(splen)
-
- dlen = DATA_HDR_LEN+(plen or 8)
- if vlen:
- dlen = dlen+(16+vlen)
+ h = self._read_data_header(opos)
- if opos+dlen > tend or tloc != pos:
- return 0 # insane
+ if opos + h.recordlen() > tend or h.tloc != pos:
+ return 0
- if index.get(oid, 0) != opos:
+ if index.get(h.oid, 0) != opos:
return 0 # insane
checked += 1
- opos = opos+dlen
+ opos = opos + h.recordlen()
return ltid
@@ -544,9 +535,9 @@
return result
def abortVersion(self, src, transaction):
- return self.commitVersion(src, '', transaction, abort=1)
+ return self.commitVersion(src, '', transaction, abort=True)
- def commitVersion(self, src, dest, transaction, abort=None):
+ def commitVersion(self, src, dest, transaction, abort=False):
# We are going to commit by simply storing back pointers.
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -571,7 +562,7 @@
finally:
self._lock_release()
- def _commitVersion(self, src, dest, transaction, abort=None):
+ def _commitVersion(self, src, dest, transaction, abort=False):
# call after checking arguments and acquiring lock
srcpos = self._vindex_get(src, 0)
spos = p64(srcpos)
@@ -592,114 +583,79 @@
current_oids = {}
t = None
tstatus = ' '
- if abort is None:
+ if not abort:
newserial = self._serial
while srcpos:
- self._file.seek(srcpos)
- h = self._file.read(DATA_VERSION_HDR_LEN)
- # h -> oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
- oid = h[:8]
- pnv = h[-16:-8]
+ h = self._read_data_header(srcpos)
if abort:
# If we are aborting, the serialno in the new data
# record should be the same as the serialno in the last
# non-version data record.
# XXX This might be the only time that the serialno
# of a data record does not match the transaction id.
- self._file.seek(u64(pnv))
- h_pnv = self._file.read(DATA_VERSION_HDR_LEN)
- newserial = h_pnv[8:16]
+ h_pnv = self._read_data_header(h.pnv)
+ serial = h_pnv.serial
+ else:
+ serial = newserial
- if self._index.get(oid) == srcpos:
+ if self._index.get(h.oid) == srcpos:
# This is a current record!
- self._tindex[oid] = here
- oids.append(oid)
- self._tfile.write(oid + newserial + spos + middle)
+ self._tindex[h.oid] = here
+ oids.append(h.oid)
+ self._tfile.write(h.oid + serial + spos + middle)
if dest:
self._tvindex[dest] = here
- self._tfile.write(pnv + sd + dest)
+ self._tfile.write(p64(h.pnv) + sd + dest)
sd = p64(here)
- self._tfile.write(abort and pnv or spos)
+ self._tfile.write(abort and p64(h.pnv) or spos)
# data backpointer to src data
here += heredelta
- current_oids[oid] = 1
-
+ current_oids[h.oid] = 1
else:
# Hm. This is a non-current record. Is there a
# current record for this oid?
- if not current_oids.has_key(oid):
- # Nope. We're done *if* this transaction wasn't undone.
- tloc = h[24:32]
- if t != tloc:
- # We haven't checked this transaction before,
- # get its status.
- t = tloc
- self._file.seek(u64(t) + 16)
- tstatus = self._file.read(1)
- if tstatus != 'u':
- # Yee ha! We can quit
- break
+ if not current_oids.has_key(h.oid):
+ break
- spos = h[-8:]
- srcpos = u64(spos)
+ srcpos = h.vprev
+ spos = p64(srcpos)
self._toid2serial_delete.update(current_oids)
return self._serial, oids
def getSize(self):
return self._pos
- def _load(self, oid, version, _index, file):
+ def _lookup_pos(self, oid):
try:
- pos = _index[oid]
+ return self._index[oid]
except KeyError:
raise POSKeyError(oid)
except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
- file.seek(pos)
- read = file.read
- h = read(DATA_HDR_LEN)
- doid, serial, prev, tloc, vlen, plen = unpack(DATA_HDR, h)
- if doid != oid:
- raise CorruptedDataError, h
- if vlen:
- pnv = read(8) # Read location of non-version data
- if (not version or len(version) != vlen or
- (read(8) # skip past version link
- and version != read(vlen))):
- return _loadBack(file, oid, pnv)
- else:
- # The most recent record is for non-version data -- cache
- # the serialno.
- self._oid2serial[oid] = serial
-
- # If we get here, then either this was not a version record,
- # or we've already read past the version data!
- if plen != z64:
- return read(u64(plen)), serial
- pnv = read(8)
- # We use the current serial, since that is the one that
- # will get checked when we store.
- return _loadBack(file, oid, pnv)[0], serial
+ raise TypeError("invalid oid %r" % (oid,))
def load(self, oid, version):
self._lock_acquire()
try:
- return self._load(oid, version, self._index, self._file)
+ pos = self._lookup_pos(oid)
+ h = self._read_data_header(pos, oid)
+ if h.version and h.version != version:
+ data, serial, _, _ = self._loadBack_impl(oid, h.pnv)
+ return data, serial
+ if h.plen:
+ return self._file.read(h.plen), h.serial
+ else:
+ data = self._loadBack_impl(oid, h.back)[0]
+ return data, h.serial
finally:
self._lock_release()
def loadSerial(self, oid, serial):
self._lock_acquire()
try:
- try:
- pos = self._index[oid]
- except KeyError:
- raise POSKeyError(oid)
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
+ pos = self._lookup_pos(oid)
while 1:
h = self._read_data_header(pos, oid)
if h.serial == serial:
@@ -746,7 +702,7 @@
if h.version:
if h.version != version:
raise VersionLockError(oid, h.version)
- pnv = h.pnv
+ pnv = h.pnv
cached_serial = h.serial
if serial != cached_serial:
@@ -796,34 +752,27 @@
# Must call with lock held.
self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN)
- tid, stl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
+ tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
self._file.read(ul + dl + el)
- tend = tpos + u64(stl) + 8
+ tend = tpos + tl + 8
pos = self._file.tell()
while pos < tend:
- h = self._file.read(DATA_HDR_LEN)
- _oid, serial, sprev, stpos, vl, sdl = struct.unpack(DATA_HDR, h)
- dl = u64(sdl)
- reclen = DATA_HDR_LEN + vl + dl
- if vl:
- reclen += 16
- if _oid == oid:
- if vl:
- self._file.read(vl + 16)
+ h = self._read_data_header(pos)
+ if h.oid == oid:
# Make sure this looks like the right data record
- if dl == 0:
+ if h.plen == 0:
# This is also a backpointer. Gotta trust it.
return pos
- if dl != len(data):
+ if h.plen != len(data):
# The expected data doesn't match what's in the
# backpointer. Something is wrong.
error("Mismatch between data and backpointer at %d", pos)
return 0
- _data = self._file.read(dl)
+ _data = self._file.read(h.plen)
if data != _data:
return 0
return pos
- pos += reclen
+ pos += h.recordlen()
self._file.seek(pos)
return 0
@@ -874,29 +823,22 @@
dlen = 0
else:
dlen = len(data)
+
# Write the recovery data record
- self._tfile.write(pack(DATA_HDR,
- oid, serial, p64(old), p64(self._pos),
- len(version), p64(dlen)))
- # We need to write some version information if this revision is
- # happening in a version.
+ new = DataHeader(oid, serial, old, self._pos, len(version), dlen)
if version:
- pnv = self._restore_pnv(oid, old, version, prev_pos)
- if pnv:
- self._tfile.write(pnv)
- else:
- self._tfile.write(p64(old))
- # Link to the last record for this version
- pv = self._tvindex.get(version, 0)
- if not pv:
- pv = self._vindex_get(version, 0)
- self._tfile.write(p64(pv))
+ pnv = self._restore_pnv(oid, old, version, prev_pos) or old
+ vprev = self._tvindex.get(version, 0)
+ if not vprev:
+ pv = self._vindex.get(version, 0)
+ new.setVersion(version, pnv, vprev)
self._tvindex[version] = here
- self._tfile.write(version)
self._toid2serial_delete[oid] = 1
else:
self._toid2serial[oid] = serial
-
+
+ self._tfile.write(new.asString())
+
# Finally, write the data or a backpointer.
if data is None:
if prev_pos:
@@ -917,41 +859,21 @@
if not prev:
return None
- pnv = None
-
# Load the record pointed to be prev
- self._file.seek(prev)
- h = self._file.read(DATA_HDR_LEN)
- doid, x, y, z, vlen, w = unpack(DATA_HDR, h)
- if doid != oid:
- raise CorruptedDataError, h
- # If the previous record is for a version, it must have
- # a valid pnv.
- if vlen > 0:
- pnv = self._file.read(8)
- pv = self._file.read(8)
- v = self._file.read(vlen)
- elif bp:
+ h = self._read_data_header(prev, oid)
+ if h.version:
+ return h.pnv
+ if h.back:
# XXX Not sure the following is always true:
# The previous record is not for this version, yet we
# have a backpointer to it. The current record must
# be an undo of an abort or commit, so the backpointer
# must be to a version record with a pnv.
- self._file.seek(bp)
- h2 = self._file.read(DATA_HDR_LEN)
- doid2, x, y, z, vlen2, sdl = unpack(DATA_HDR, h2)
- dl = u64(sdl)
- if oid != doid2:
- raise CorruptedDataError, h2
- if vlen2 > 0:
- pnv = self._file.read(8)
- pv = self._file.read(8)
- v = self._file.read(8)
- else:
- warn("restore could not find previous non-version data "
- "at %d or %d" % (prev, bp))
+ h2 = self._read_data_header(h.back, oid)
+ if h2.version:
+ return h2.pnv
- return pnv
+ return None
def supportsUndo(self):
return 1
@@ -991,33 +913,20 @@
if not dlen:
return # No data in this trans
self._tfile.seek(0)
- user, desc, ext = self._ude
- luser = len(user)
- ldesc = len(desc)
- lext = len(ext)
-
+ user, descr, ext = self._ude
+
self._file.seek(self._pos)
tl = self._thl + dlen
- stl = p64(tl)
try:
- # Note that we use a status of 'c', for checkpoint.
- # If this flag isn't cleared, anything after this is
- # suspect.
- self._file.write(pack(
- ">8s" "8s" "c" "H" "H" "H"
- ,self._serial, stl,'c', luser, ldesc, lext,
- ))
- if user:
- self._file.write(user)
- if desc:
- self._file.write(desc)
- if ext:
- self._file.write(ext)
-
+ h = TxnHeader(self._serial, tl, "c", len(user),
+ len(descr), len(ext))
+ h.user = user
+ h.descr = descr
+ h.ext = ext
+ self._file.write(h.asString())
cp(self._tfile, self._file, dlen)
-
- self._file.write(stl)
+ self._file.write(p64(tl))
self._file.flush()
except:
# Hm, an error occured writing out the data. Maybe the
@@ -1077,57 +986,34 @@
"""Return the serial, data pointer, data, and version for the oid
record at pos"""
if tpos:
- file=self._tfile
pos = tpos - self._pos - self._thl
- tpos=file.tell()
- else:
- file=self._file
-
- read=file.read
- file.seek(pos)
- h=read(DATA_HDR_LEN)
- roid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
- if roid != oid:
- raise UndoError('Invalid undo transaction id', oid)
- if vlen:
- read(16) # skip nv pointer and version previous pointer
- version=read(vlen)
+ tpos = self._tfile.tell()
+ h = self._tfmt._read_data_header(pos, oid)
+ afile = self._tfile
else:
- version=''
+ h = self._read_data_header(pos, oid)
+ afile = self._file
+ if h.oid != oid:
+ raise UndoError("Invalid undo transaction id", oid)
- plen = u64(splen)
- if plen:
- data = read(plen)
+ if h.plen:
+ data = afile.read(h.plen)
else:
- data=''
- pos=u64(read(8))
+ data = ''
+ pos = h.back
- if tpos: file.seek(tpos) # Restore temp file to end
+ if tpos:
+ self._tfile.seek(tpos) # Restore temp file to end
- return serial, pos, data, version
-
- def _getVersion(self, oid, pos):
- self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
- doid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- assert doid == oid
- if vlen:
- h = self._file.read(16)
- return self._file.read(vlen), h[:8]
- else:
- return '', ''
+ return h.serial, pos, data, h.version
def getSerial(self, oid):
self._lock_acquire()
try:
result = self._get_cached_serial(oid)
if result is None:
- try:
- result = self._getSerial(oid, self._index[oid])
- except KeyError:
- raise POSKeyError(oid)
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
+ pos = self._lookup_pos(oid)
+ result = self._getSerial(oid, pos)
return result
finally:
self._lock_release()
@@ -1135,20 +1021,15 @@
def _getSerial(self, oid, pos):
self._file.seek(pos)
h = self._file.read(16)
- if len(h) < 16:
- raise CorruptedDataError(h)
- h += self._file.read(26) # get rest of header
- if h[:8] != oid:
- raise CorruptedDataError(h)
- oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- if splen == z64:
- # a back pointer
- bp = self._file.read(8)
- if bp == z64:
- # If the backpointer is 0 (encoded as z64), then
- # this transaction undoes the object creation.
- raise KeyError(oid)
- return serial
+ assert oid == h[:8]
+ return h[8:]
+
+ def _getVersion(self, oid, pos):
+ h = self._read_data_header(pos, oid)
+ if h.version:
+ return h.version, h.pnv
+ else:
+ return "", None
def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
"""Get the indo information for a data record
@@ -1159,12 +1040,13 @@
be 0, but the pickle can be empty *and* the pointer 0.
"""
- copy=1 # Can we just copy a data pointer
+ copy = 1 # Can we just copy a data pointer
# First check if it is possible to undo this record.
- tpos=self._tindex.get(oid, 0)
- ipos=self._index.get(oid, 0)
- tipos=tpos or ipos
+ tpos = self._tindex.get(oid, 0)
+ ipos = self._index.get(oid, 0)
+ tipos = tpos or ipos
+
if tipos != pos:
# Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are.
@@ -1182,8 +1064,8 @@
cdataptr == tipos
or
# Backpointers are different
- _loadBackPOS(self._file, oid, p64(pos)) !=
- _loadBackPOS(self._file, oid, p64(cdataptr))
+ self._loadBackPOS(oid, pos) !=
+ self._loadBackPOS(oid, cdataptr)
):
if pre and not tpos:
copy = 0 # we'll try to do conflict resolution
@@ -1200,20 +1082,20 @@
if not pre:
# There is no previous revision, because the object creation
# is being undone.
- return '', 0, '', '', ipos
+ return "", 0, "", "", ipos
version, snv = self._getVersion(oid, pre)
if copy:
# we can just copy our previous-record pointer forward
- return '', pre, version, snv, ipos
+ return "", pre, version, snv, ipos
try:
# returns data, serial tuple
- bdata = _loadBack(self._file, oid, p64(pre))[0]
+ bdata = self._loadBack_impl(oid, pre)[0]
except KeyError:
# couldn't find oid; what's the real explanation for this?
raise UndoError("_loadBack() failed for %s", oid)
- data=self.tryToResolveConflict(oid, cserial, serial, bdata, cdata)
+ data = self.tryToResolveConflict(oid, cserial, serial, bdata, cdata)
if data:
return data, 0, version, snv, ipos
@@ -1288,7 +1170,6 @@
def _txn_find(self, tid, stop_at_pack):
pos = self._pos
- # XXX Why 39? Only because undoLog() uses it as a boundary.
while pos > 39:
self._file.seek(pos - 8)
pos = pos - u64(self._file.read(8)) - 8
@@ -1306,67 +1187,54 @@
def _txn_undo_write(self, tpos):
# a helper function to write the data records for transactional undo
- ostloc = p64(self._pos)
- here = self._pos + (self._tfile.tell() + self._thl)
+ otloc = self._pos
+ here = self._pos + self._tfile.tell() + self._thl
+ base = here - self._tfile.tell()
# Let's move the file pointer back to the start of the txn record.
- self._file.seek(tpos)
- h = self._file.read(TRANS_HDR_LEN)
- if h[16] == 'u':
- return
- if h[16] != ' ':
+ th = self._read_txn_header(tpos)
+ if th.status != " ":
raise UndoError('non-undoable transaction')
- tl = u64(h[8:16])
- ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
- tend = tpos + tl
- pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
+ tend = tpos + th.tlen
+ pos = tpos + th.headerlen()
tindex = {}
- failures = {} # keep track of failures, cause we may succeed later
- failed = failures.has_key
+
+ # keep track of failures, cause we may succeed later
+ failures = {}
# Read the data records for this transaction
while pos < tend:
- self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
- oid, serial, sprev, stloc, vlen, splen = \
- struct.unpack(DATA_HDR, h)
- if failed(oid):
- del failures[oid] # second chance!
- plen = u64(splen)
- prev = u64(sprev)
- if vlen:
- dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
- self._file.seek(16, 1)
- version = self._file.read(vlen)
- else:
- dlen = DATA_HDR_LEN + (plen or 8)
- version = ''
+ h = self._read_data_header(pos)
+ if h.oid in failures:
+ del failures[h.oid] # second chance!
+ assert base + self._tfile.tell() == here, (here, base,
+ self._tfile.tell())
try:
p, prev, v, snv, ipos = self._transactionalUndoRecord(
- oid, pos, serial, prev, version)
+ h.oid, pos, h.serial, h.prev, h.version)
except UndoError, v:
# Don't fail right away. We may be redeemed later!
- failures[oid] = v
+ failures[h.oid] = v
else:
- plen = len(p)
- self._tfile.write(pack(DATA_HDR,
- oid, self._serial, p64(ipos),
- ostloc, len(v), p64(plen)))
+ new = DataHeader(h.oid, self._serial, ipos, otloc, len(v),
+ len(p))
if v:
- vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
- self._tfile.write(snv + p64(vprev) + v)
+ vprev = self._tvindex.get(v, 0) or self._vindex.get(v, 0)
+ new.setVersion(v, snv, vprev)
self._tvindex[v] = here
- odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
- else:
- odlen = DATA_HDR_LEN + (plen or 8)
+ # XXX This seek shouldn't be necessary, but some other
+ # bit of code is messig with the file pointer.
+ assert self._tfile.tell() == here - base, (here, base,
+ self._tfile.tell())
+ self._tfile.write(new.asString())
if p:
self._tfile.write(p)
else:
self._tfile.write(p64(prev))
- tindex[oid] = here
- here += odlen
+ tindex[h.oid] = here
+ here += new.recordlen()
- pos += dlen
+ pos += h.recordlen()
if pos > tend:
raise UndoError("non-undoable transaction")
@@ -1439,47 +1307,43 @@
while 1:
if len(r) >= size: return r
- self._file.seek(pos)
- h=self._file.read(DATA_HDR_LEN)
- doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
- prev=u64(prev)
-
- if vlen:
- self._file.read(16)
- version = self._file.read(vlen)
- if wantver is not None and version != wantver:
- if prev:
- pos=prev
+ h = self._read_data_header(pos)
+
+ if h.version:
+ if wantver is not None and h.version != wantver:
+ if h.prev:
+ pos = h.prev
continue
else:
return r
else:
- version=''
- wantver=None
+ version = ""
+ wantver = None
- self._file.seek(u64(tloc))
- h = self._file.read(TRANS_HDR_LEN)
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
- user_name = self._file.read(ul)
- description = self._file.read(dl)
- if el:
- d=loads(self._file.read(el))
+ th = self._read_txn_header(h.tloc)
+ user_name = self._file.read(th.ulen)
+ description = self._file.read(th.dlen)
+ if th.elen:
+ d = loads(self._file.read(th.elen))
else:
- d={}
+ d = {}
- d['time']=TimeStamp(serial).timeTime()
- d['user_name']=user_name
- d['description']=description
- d['serial']=serial
- d['version']=version
- d['size']=u64(plen)
+ d["time"] = TimeStamp(h.serial).timeTime()
+ d["user_name"] = user_name
+ d["description"] = description
+ d["serial"] = h.serial
+ d["version"] = h.version
+ d["size"] = h.plen
if filter is None or filter(d):
r.append(d)
- if prev: pos=prev
- else: return r
- finally: self._lock_release()
+ if h.prev:
+ pos = h.prev
+ else:
+ return r
+ finally:
+ self._lock_release()
def _redundant_pack(self, file, pos):
assert pos > 8, pos
@@ -1770,13 +1634,16 @@
seek = file.seek
seek(0, 2)
file_size=file.tell()
+ fmt = TempFormatter(file)
if file_size:
if file_size < start: raise FileStorageFormatError, file.name
seek(0)
- if read(4) != packed_version: raise FileStorageFormatError, name
+ if read(4) != packed_version:
+ raise FileStorageFormatError, name
else:
- if not read_only: file.write(packed_version)
+ if not read_only:
+ file.write(packed_version)
return 4L, maxoid, ltid
index_get=index.get
@@ -1796,15 +1663,13 @@
file.truncate()
break
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
+ tid, tl, status, ul, dl, el = unpack(TRANS_HDR,h)
if el < 0: el=t32-el
if tid <= ltid:
warn("%s time-stamp reduction at %s", name, pos)
ltid = tid
- tl=u64(stl)
-
if pos+(tl+8) > file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
@@ -1841,7 +1706,8 @@
if recover: return pos, None, None
panic('%s has invalid transaction header at %s', name, pos)
- if tid >= stop: break
+ if tid >= stop:
+ break
tpos=pos
tend=tpos+tl
@@ -1849,45 +1715,31 @@
if status=='u':
# Undone transaction, skip it
seek(tend)
- h=read(8)
- if h != stl:
+ h=u64(read(8))
+ if h != tl:
if recover: return tpos, None, None
panic('%s has inconsistent transaction length at %s',
name, pos)
pos=tend+8
continue
- pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
+ pos = tpos+ TRANS_HDR_LEN + ul + dl + el
while pos < tend:
# Read the data records for this transaction
-
- seek(pos)
- h=read(DATA_HDR_LEN)
- oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
- prev=u64(sprev)
- tloc=u64(stloc)
- plen=u64(splen)
-
- dlen=DATA_HDR_LEN+(plen or 8)
- tindex[oid]=pos
-
- if vlen:
- dlen=dlen+(16+vlen)
- read(16)
- pv=u64(read(8))
- version=read(vlen)
- # Jim says: "It's just not worth the bother."
- #if vndexpos(version, 0) != pv:
- # panic("%s incorrect previous version pointer at %s",
- # name, pos)
- vindex[version]=pos
-
- if pos+dlen > tend or tloc != tpos:
- if recover: return tpos, None, None
+ h = fmt._read_data_header(pos)
+ dlen = h.recordlen()
+ tindex[h.oid] = pos
+
+ if h.version:
+ vindex[h.version] = pos
+
+ if pos + dlen > tend or h.tloc != tpos:
+ if recover:
+ return tpos, None, None
panic("%s data record exceeds transaction record at %s",
name, pos)
- if index_get(oid, 0) != prev:
+ if index_get(h.oid, 0) != h.prev:
if prev:
if recover: return tpos, None, None
error("%s incorrect previous pointer at %s", name, pos)
@@ -1902,8 +1754,8 @@
# Read the (intentionally redundant) transaction length
seek(pos)
- h=read(8)
- if h != stl:
+ h = u64(read(8))
+ if h != tl:
if recover: return tpos, None, None
panic("%s redundant transaction length check failed at %s",
name, pos)
@@ -2006,14 +1858,14 @@
return self.__current
-class FileIterator(Iterator):
+class FileIterator(Iterator, FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid = z64
_file = None
def __init__(self, file, start=None, stop=None):
- if isinstance(file, StringType):
+ if isinstance(file, str):
file = open(file, 'rb')
self._file = file
if file.read(4) != packed_version:
@@ -2021,8 +1873,8 @@
file.seek(0,2)
self._file_size = file.tell()
self._pos = 4L
- assert start is None or isinstance(start, StringType)
- assert stop is None or isinstance(stop, StringType)
+ assert start is None or isinstance(start, str)
+ assert stop is None or isinstance(stop, str)
if start:
self._skip_to_start(start)
self._stop = stop
@@ -2079,35 +1931,30 @@
# A closed iterator. XXX: Is IOError the best we can do? For
# now, mimic a read on a closed file.
raise IOError, 'iterator is closed'
- file=self._file
- seek=file.seek
- read=file.read
- pos=self._pos
+ pos = self._pos
while 1:
# Read the transaction record
- seek(pos)
- h=read(TRANS_HDR_LEN)
- if len(h) < TRANS_HDR_LEN: break
-
-
- tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
- if el < 0: el=t32-el
-
- if tid <= self._ltid:
+ try:
+ h = self._read_txn_header(pos)
+ except CorruptedDataError, err:
+ # If buf is empty, we've reached EOF.
+ if not err.buf:
+ break
+ raise
+
+ if h.tid <= self._ltid:
warn("%s time-stamp reduction at %s", self._file.name, pos)
- self._ltid=tid
+ self._ltid = h.tid
- if self._stop is not None and tid > self._stop:
+ if self._stop is not None and h.tid > self._stop:
raise IndexError, index
- if status == 'c':
+ if h.status == "c":
# Assume we've hit the last, in-progress transaction
raise IndexError, index
- tl=u64(stl)
-
- if pos+(tl+8) > self._file_size:
+ if pos + h.tlen + 8 > self._file_size:
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
@@ -2115,22 +1962,22 @@
self._file.name, pos)
break
- if status not in ' up':
+ if h.status not in " up":
warn('%s has invalid status, %s, at %s', self._file.name,
- status, pos)
+ h.status, pos)
- if tl < (TRANS_HDR_LEN+ul+dl+el):
+ if h.tlen < h.headerlen():
# We're in trouble. Find out if this is bad data in
# the middle of the file, or just a turd that Win 9x
# dropped at the end when the system crashed. Skip to
# the end and read what should be the transaction
# length of the last transaction.
- seek(-8, 2)
- rtl=u64(read(8))
+ self._file.seek(-8, 2)
+ rtl = u64(self._file.read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
- nearPanic('%s has invalid transaction header at %s',
+ nearPanic("%s has invalid transaction header at %s",
self._file.name, pos)
warn("It appears that there is invalid data at the end of "
"the file, possibly due to a system crash. %s "
@@ -2138,49 +1985,42 @@
% self._file.name)
break
else:
- warn('%s has invalid transaction header at %s',
+ warn("%s has invalid transaction header at %s",
self._file.name, pos)
break
- tpos=pos
- tend=tpos+tl
+ tpos = pos
+ tend = tpos + h.tlen
+
+ if h.status != "u":
+ pos = tpos + h.headerlen()
+ user = self._file.read(h.ulen)
+ description = self._file.read(h.dlen)
+ e = {}
+ if h.elen:
+ try:
+ e = loads(self._file.read(h.elen))
+ except:
+ pass
- if status=='u':
- # Undone transaction, skip it
- seek(tend)
- h=read(8)
- if h != stl:
- panic('%s has inconsistent transaction length at %s',
- self._file.name, pos)
- pos=tend+8
- continue
-
- pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
- user=read(ul)
- description=read(dl)
- if el:
- try: e=loads(read(el))
- except: e={}
- else: e={}
-
- result = RecordIterator(tid, status, user, description, e, pos,
- tend, file, tpos)
- pos = tend
+ result = RecordIterator(h.tid, h.status, user, description,
+ e, pos, tend, self._file, tpos)
# Read the (intentionally redundant) transaction length
- seek(pos)
- h = read(8)
- if h != stl:
+ self._file.seek(tend)
+ rtl = u64(self._file.read(8))
+ if rtl != h.tlen:
warn("%s redundant transaction length check failed at %s",
- self._file.name, pos)
+ self._file.name, tend)
break
- self._pos = pos + 8
+ self._pos = tend + 8
return result
raise IndexError, index
-class RecordIterator(Iterator, BaseStorage.TransactionRecord):
+class RecordIterator(Iterator, BaseStorage.TransactionRecord,
+ FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file."""
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
self.tid = tid
@@ -2197,51 +2037,31 @@
pos = self._pos
while pos < self._tend:
# Read the data records for this transaction
- self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
- oid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- prev = u64(sprev)
- tloc = u64(stloc)
- plen = u64(splen)
- dlen = DATA_HDR_LEN + (plen or 8)
+ h = self._read_data_header(pos)
+ dlen = h.recordlen()
- if vlen:
- dlen += (16 + vlen)
- tmp = self._file.read(16)
- pv = u64(tmp[8:16])
- version = self._file.read(vlen)
- else:
- version = ''
-
- datapos = pos + DATA_HDR_LEN
- if vlen:
- datapos += 16 + vlen
- assert self._file.tell() == datapos, (self._file.tell(), datapos)
-
- if pos + dlen > self._tend or tloc != self._tpos:
+ if pos + dlen > self._tend or h.tloc != self._tpos:
warn("%s data record exceeds transaction record at %s",
file.name, pos)
break
self._pos = pos + dlen
prev_txn = None
- if plen:
- data = self._file.read(plen)
+ if h.plen:
+ data = self._file.read(h.plen)
else:
- bp = self._file.read(8)
- if bp == z64:
- # If the backpointer is 0 (encoded as z64), then
- # this transaction undoes the object creation. It
- # either aborts the version that created the
- # object or undid the transaction that created it.
- # Return None instead of a pickle to indicate
- # this.
+ if h.back == 0:
+ # If the backpointer is 0, then this transaction
+ # undoes the object creation. It either aborts
+ # the version that created the object or undid the
+ # transaction that created it. Return None
+ # instead of a pickle to indicate this.
data = None
else:
- data, _s, tid = _loadBackTxn(self._file, oid, bp)
- prev_txn = getTxnFromData(self._file, oid, bp)
+ data, _s, tid = self._loadBackTxn(h.oid, h.back, False)
+ prev_txn = self.getTxnFromData(h.oid, h.back)
- r = Record(oid, serial, version, data, prev_txn)
+ r = Record(h.oid, h.serial, h.version, data, prev_txn)
return r
=== ZODB3/ZODB/FileStorage/format.py 1.1.2.2 => 1.1.2.3 ===
--- ZODB3/ZODB/FileStorage/format.py:1.1.2.2 Thu Oct 9 12:10:39 2003
+++ ZODB3/ZODB/FileStorage/format.py Thu Oct 9 16:26:23 2003
@@ -231,6 +231,9 @@
tid = h[:8]
return data, serial, tid
+ def _loadBackPOS(self, oid, back):
+ return self._loadBack_impl(oid, back)[2]
+
def getTxnFromData(self, oid, back):
"""Return transaction id for data at back."""
h = self._read_data_header(back, oid)
More information about the Zodb-checkins
mailing list