[Zodb-checkins] CVS: ZODB3/ZODB/FileStorage - fspack.py:1.2
fsdump.py:1.2 format.py:1.2 __init__.py:1.2 FileStorage.py:1.2
Jeremy Hylton
jeremy at zope.com
Wed Dec 24 11:02:31 EST 2003
Update of /cvs-repository/ZODB3/ZODB/FileStorage
In directory cvs.zope.org:/tmp/cvs-serv27465/ZODB/FileStorage
Added Files:
fspack.py fsdump.py format.py __init__.py FileStorage.py
Log Message:
Merge MVCC branch to the HEAD.
=== ZODB3/ZODB/FileStorage/fspack.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:31 2003
+++ ZODB3/ZODB/FileStorage/fspack.py Wed Dec 24 11:01:59 2003
@@ -0,0 +1,647 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""FileStorage helper to perform pack.
+
+A storage contains an ordered set of object revisions. When a storage
+is packed, object revisions that are not reachable as of the pack time
+are deleted. The notion of reachability is complicated by
+backpointers -- object revisions that point to earlier revisions of
+the same object.
+
+An object revisions is reachable at a certain time if it is reachable
+from the revision of the root at that time or if it is reachable from
+a backpointer after that time.
+"""
+
+# This module contains code backported from ZODB4 from the
+# zodb.storage.file package. It's been edited heavily to work with
+# ZODB3 code and storage layout.
+
+import os
+import struct
+from types import StringType
+
+from ZODB.referencesf import referencesf
+from ZODB.utils import p64, u64, z64, oid_repr
+from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
+
+from ZODB.fsIndex import fsIndex
+from ZODB.FileStorage.format \
+ import FileStorageFormatter, CorruptedDataError, DataHeader, \
+ TRANS_HDR_LEN
+
+class DataCopier(FileStorageFormatter):
+ """Mixin class for copying transactions into a storage.
+
+ The restore() and pack() methods share a need to copy data records
+ and update pointers to data in earlier transaction records. This
+ class provides the shared logic.
+
+ The mixin extends the FileStorageFormatter with a copy() method.
+ It also requires that the concrete class provides the following
+ attributes:
+
+ _file -- file with earlier destination data
+ _tfile -- destination file for copied data
+ _packt -- p64() representation of latest pack time
+ _pos -- file pos of destination transaction
+ _tindex -- maps oid to data record file pos
+ _tvindex -- maps version name to data record file pos
+
+ _tindex and _tvindex are updated by copy().
+
+ The copy() method does not do any locking.
+ """
+
+ def _txn_find(self, tid, stop_at_pack):
+ # _pos always points just past the last transaction
+ pos = self._pos
+ while pos > 4:
+ self._file.seek(pos - 8)
+ pos = pos - u64(self._file.read(8)) - 8
+ self._file.seek(pos)
+ h = self._file.read(TRANS_HDR_LEN)
+ _tid = h[:8]
+ if _tid == tid:
+ return pos
+ if stop_at_pack:
+ if h[16] == 'p':
+ break
+ raise UndoError(None, "Invalid transaction id")
+
+ def _data_find(self, tpos, oid, data):
+ # Return backpointer to oid in data record for in transaction at tpos.
+ # It should contain a pickle identical to data. Returns 0 on failure.
+ # Must call with lock held.
+ h = self._read_txn_header(tpos)
+ tend = tpos + h.tlen
+ pos = self._file.tell()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ if h.oid == oid:
+ # Make sure this looks like the right data record
+ if h.plen == 0:
+ # This is also a backpointer. Gotta trust it.
+ return pos
+ if h.plen != len(data):
+ # The expected data doesn't match what's in the
+ # backpointer. Something is wrong.
+ error("Mismatch between data and backpointer at %d", pos)
+ return 0
+ _data = self._file.read(h.plen)
+ if data != _data:
+ return 0
+ return pos
+ pos += h.recordlen()
+ return 0
+
+ def _restore_pnv(self, oid, prev, version, bp):
+ # Find a valid pnv (previous non-version) pointer for this version.
+
+ # If there is no previous record, there can't be a pnv.
+ if not prev:
+ return None
+
+ pnv = None
+ h = self._read_data_header(prev, oid)
+ # If the previous record is for a version, it must have
+ # a valid pnv.
+ if h.version:
+ return h.pnv
+ elif bp:
+ # XXX Not sure the following is always true:
+ # The previous record is not for this version, yet we
+ # have a backpointer to it. The current record must
+ # be an undo of an abort or commit, so the backpointer
+ # must be to a version record with a pnv.
+ h2 = self._read_data_header(bp, oid)
+ if h2.version:
+ return h2.pnv
+ else:
+ warn("restore could not find previous non-version data "
+ "at %d or %d", prev, bp)
+ return None
+
+ def _resolve_backpointer(self, prev_txn, oid, data):
+ prev_pos = 0
+ if prev_txn is not None:
+ prev_txn_pos = self._txn_find(prev_txn, 0)
+ if prev_txn_pos:
+ prev_pos = self._data_find(prev_txn_pos, oid, data)
+ return prev_pos
+
+ def copy(self, oid, serial, data, version, prev_txn,
+ txnpos, datapos):
+ prev_pos = self._resolve_backpointer(prev_txn, oid, data)
+ old = self._index.get(oid, 0)
+ # Calculate the pos the record will have in the storage.
+ here = datapos
+ # And update the temp file index
+ self._tindex[oid] = here
+ if prev_pos:
+ # If there is a valid prev_pos, don't write data.
+ data = None
+ if data is None:
+ dlen = 0
+ else:
+ dlen = len(data)
+ # Write the recovery data record
+ h = DataHeader(oid, serial, old, txnpos, len(version), dlen)
+ if version:
+ h.version = version
+ pnv = self._restore_pnv(oid, old, version, prev_pos)
+ if pnv is not None:
+ h.pnv = pnv
+ else:
+ h.pnv = old
+ # Link to the last record for this version
+ h.vprev = self._tvindex.get(version, 0)
+ if not h.vprev:
+ h.vprev = self._vindex.get(version, 0)
+ self._tvindex[version] = here
+
+ self._tfile.write(h.asString())
+ # Write the data or a backpointer
+ if data is None:
+ if prev_pos:
+ self._tfile.write(p64(prev_pos))
+ else:
+ # Write a zero backpointer, which indicates an
+ # un-creation transaction.
+ self._tfile.write(z64)
+ else:
+ self._tfile.write(data)
+
+class GC(FileStorageFormatter):
+
+ def __init__(self, file, eof, packtime):
+ self._file = file
+ self._name = file.name
+ self.eof = eof
+ self.packtime = packtime
+ # packpos: position of first txn header after pack time
+ self.packpos = None
+ self.oid2curpos = fsIndex() # maps oid to current data record position
+ self.oid2verpos = fsIndex() # maps oid to current version data
+
+ # The set of reachable revisions of each object.
+ #
+ # This set as managed using two data structures. The first is
+ # an fsIndex mapping oids to one data record pos. Since only
+ # a few objects will have more than one revision, we use this
+ # efficient data structure to handle the common case. The
+ # second is a dictionary mapping objects to lists of
+ # positions; it is used to handle the same number of objects
+ # for which we must keep multiple revisions.
+
+ self.reachable = fsIndex()
+ self.reach_ex = {}
+
+ # keep ltid for consistency checks during initial scan
+ self.ltid = z64
+
+ def isReachable(self, oid, pos):
+ """Return 1 if revision of `oid` at `pos` is reachable."""
+
+ rpos = self.reachable.get(oid)
+ if rpos is None:
+ return 0
+ if rpos == pos:
+ return 1
+ return pos in self.reach_ex.get(oid, [])
+
+ def findReachable(self):
+ self.buildPackIndex()
+ self.findReachableAtPacktime([z64])
+ self.findReachableFromFuture()
+ # These mappings are no longer needed and may consume a lot
+ # of space.
+ del self.oid2verpos
+ del self.oid2curpos
+
+ def buildPackIndex(self):
+ pos = 4L
+ while pos < self.eof:
+ th = self._read_txn_header(pos)
+ if th.tid > self.packtime:
+ break
+ self.checkTxn(th, pos)
+
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+ if dh.version:
+ self.oid2verpos[dh.oid] = pos
+ else:
+ self.oid2curpos[dh.oid] = pos
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ self.packpos = pos
+
+ def findReachableAtPacktime(self, roots):
+ """Mark all objects reachable from the oids in roots as reachable."""
+ todo = list(roots)
+ while todo:
+ oid = todo.pop()
+ if self.reachable.has_key(oid):
+ continue
+
+ L = []
+
+ pos = self.oid2curpos.get(oid)
+ if pos is not None:
+ L.append(pos)
+ todo.extend(self.findrefs(pos))
+
+ pos = self.oid2verpos.get(oid)
+ if pos is not None:
+ L.append(pos)
+ todo.extend(self.findrefs(pos))
+
+ if not L:
+ continue
+
+ pos = L.pop()
+ self.reachable[oid] = pos
+ if L:
+ self.reach_ex[oid] = L
+
+ def findReachableFromFuture(self):
+ # In this pass, the roots are positions of object revisions.
+ # We add a pos to extra_roots when there is a backpointer to a
+ # revision that was not current at the packtime. The
+ # non-current revision could refer to objects that were
+ # otherwise unreachable at the packtime.
+ extra_roots = []
+
+ pos = self.packpos
+ while pos < self.eof:
+ th = self._read_txn_header(pos)
+ self.checkTxn(th, pos)
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+
+ if dh.back and dh.back < self.packpos:
+ if self.reachable.has_key(dh.oid):
+ L = self.reach_ex.setdefault(dh.oid, [])
+ if dh.back not in L:
+ L.append(dh.back)
+ extra_roots.append(dh.back)
+ else:
+ self.reachable[dh.oid] = dh.back
+
+ if dh.version and dh.pnv:
+ if self.reachable.has_key(dh.oid):
+ L = self.reach_ex.setdefault(dh.oid, [])
+ if dh.pnv not in L:
+ L.append(dh.pnv)
+ extra_roots.append(dh.pnv)
+ else:
+ self.reachable[dh.oid] = dh.back
+
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ for pos in extra_roots:
+ refs = self.findrefs(pos)
+ self.findReachableAtPacktime(refs)
+
+ def findrefs(self, pos):
+ """Return a list of oids referenced as of packtime."""
+ dh = self._read_data_header(pos)
+ # Chase backpointers until we get to the record with the refs
+ while dh.back:
+ dh = self._read_data_header(dh.back)
+ if dh.plen:
+ return referencesf(self._file.read(dh.plen))
+ else:
+ return []
+
+class PackCopier(DataCopier):
+
+ # PackCopier has to cope with _file and _tfile being the
+ # same file. The copy() implementation is written assuming
+ # that they are different, so that using one object doesn't
+ # mess up the file pointer for the other object.
+
+ # PackCopier overrides _resolve_backpointer() and _restore_pnv()
+ # to guarantee that they keep the file pointer for _tfile in
+ # the right place.
+
+ def __init__(self, f, index, vindex, tindex, tvindex):
+ self._file = f
+ self._tfile = f
+ self._index = index
+ self._vindex = vindex
+ self._tindex = tindex
+ self._tvindex = tvindex
+ self._pos = None
+
+ def setTxnPos(self, pos):
+ self._pos = pos
+
+ def _resolve_backpointer(self, prev_txn, oid, data):
+ pos = self._tfile.tell()
+ try:
+ return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
+ finally:
+ self._tfile.seek(pos)
+
+ def _restore_pnv(self, oid, prev, version, bp):
+ pos = self._tfile.tell()
+ try:
+ return DataCopier._restore_pnv(self, oid, prev, version, bp)
+ finally:
+ self._tfile.seek(pos)
+
+class FileStoragePacker(FileStorageFormatter):
+
+ def __init__(self, path, stop, la, lr, cla, clr):
+ self._name = path
+ self._file = open(path, "rb")
+ self._stop = stop
+ self._packt = None
+ self.locked = 0
+ self._file.seek(0, 2)
+ self.file_end = self._file.tell()
+ self._file.seek(0)
+
+ self.gc = GC(self._file, self.file_end, self._stop)
+
+ # The packer needs to acquire the parent's commit lock
+ # during the copying stage, so the two sets of lock acquire
+ # and release methods are passed to the constructor.
+ self._lock_acquire = la
+ self._lock_release = lr
+ self._commit_lock_acquire = cla
+ self._commit_lock_release = clr
+
+ # The packer will use several indexes.
+ # index: oid -> pos
+ # vindex: version -> pos of XXX
+ # tindex: oid -> pos, for current txn
+ # tvindex: version -> pos of XXX, for current txn
+ # oid2tid: not used by the packer
+
+ self.index = fsIndex()
+ self.vindex = {}
+ self.tindex = {}
+ self.tvindex = {}
+ self.oid2tid = {}
+ self.toid2tid = {}
+ self.toid2tid_delete = {}
+
+ # Index for non-version data. This is a temporary structure
+ # to reduce I/O during packing
+ self.nvindex = fsIndex()
+
+ def pack(self):
+ # Pack copies all data reachable at the pack time or later.
+ #
+ # Copying occurs in two phases. In the first phase, txns
+ # before the pack time are copied if the contain any reachable
+ # data. In the second phase, all txns after the pack time
+ # are copied.
+ #
+ # Txn and data records contain pointers to previous records.
+ # Because these pointers are stored as file offsets, they
+ # must be updated when we copy data.
+
+ # XXX Need to add sanity checking to pack
+
+ self.gc.findReachable()
+
+ # Setup the destination file and copy the metadata.
+ # XXX rename from _tfile to something clearer
+ self._tfile = open(self._name + ".pack", "w+b")
+ self._file.seek(0)
+ self._tfile.write(self._file.read(self._metadata_size))
+
+ self._copier = PackCopier(self._tfile, self.index, self.vindex,
+ self.tindex, self.tvindex)
+
+ ipos, opos = self.copyToPacktime()
+ assert ipos == self.gc.packpos
+ if ipos == opos:
+ # pack didn't free any data. there's no point in continuing.
+ self._tfile.close()
+ os.remove(self._name + ".pack")
+ return None
+ self._commit_lock_acquire()
+ self.locked = 1
+ self._lock_acquire()
+ try:
+ self._file.seek(0, 2)
+ self.file_end = self._file.tell()
+ finally:
+ self._lock_release()
+ if ipos < self.file_end:
+ self.copyRest(ipos)
+
+ # OK, we've copied everything. Now we need to wrap things up.
+ pos = self._tfile.tell()
+ self._tfile.flush()
+ self._tfile.close()
+ self._file.close()
+
+ return pos
+
+ def copyToPacktime(self):
+ offset = 0L # the amount of space freed by packing
+ pos = self._metadata_size
+ new_pos = pos
+
+ while pos < self.gc.packpos:
+ th = self._read_txn_header(pos)
+ new_tpos, pos = self.copyDataRecords(pos, th)
+
+ if new_tpos:
+ new_pos = self._tfile.tell() + 8
+ tlen = new_pos - new_tpos - 8
+ # Update the transaction length
+ self._tfile.seek(new_tpos + 8)
+ self._tfile.write(p64(tlen))
+ self._tfile.seek(new_pos - 8)
+ self._tfile.write(p64(tlen))
+
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ return pos, new_pos
+
+ def fetchBackpointer(self, oid, back):
+ """Return data and refs backpointer `back` to object `oid.
+
+ If `back` is 0 or ultimately resolves to 0, return None
+ and None. In this case, the transaction undoes the object
+ creation.
+ """
+ if back == 0:
+ return None
+ data, tid = self._loadBackTxn(oid, back, 0)
+ return data
+
+ def copyDataRecords(self, pos, th):
+ """Copy any current data records between pos and tend.
+
+ Returns position of txn header in output file and position
+ of next record in the input file.
+
+ If any data records are copied, also write txn header (th).
+ """
+ copy = 0
+ new_tpos = 0L
+ tend = pos + th.tlen
+ pos += th.headerlen()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ if not self.gc.isReachable(h.oid, pos):
+ pos += h.recordlen()
+ continue
+ pos += h.recordlen()
+
+ # If we are going to copy any data, we need to copy
+ # the transaction header. Note that we will need to
+ # patch up the transaction length when we are done.
+ if not copy:
+ th.status = "p"
+ s = th.asString()
+ new_tpos = self._tfile.tell()
+ self._tfile.write(s)
+ new_pos = new_tpos + len(s)
+ copy = 1
+
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data = self.fetchBackpointer(h.oid, h.back)
+
+ self.writePackedDataRecord(h, data, new_tpos)
+ new_pos = self._tfile.tell()
+
+ return new_tpos, pos
+
+ def writePackedDataRecord(self, h, data, new_tpos):
+ # Update the header to reflect current information, then write
+ # it to the output file.
+ if data is None:
+ data = ""
+ h.prev = 0
+ h.back = 0
+ h.plen = len(data)
+ h.tloc = new_tpos
+ pos = self._tfile.tell()
+ if h.version:
+ h.pnv = self.index.get(h.oid, 0)
+ h.vprev = self.vindex.get(h.version, 0)
+ self.vindex[h.version] = pos
+ self.index[h.oid] = pos
+ if h.version:
+ self.vindex[h.version] = pos
+ self._tfile.write(h.asString())
+ self._tfile.write(data)
+ if not data:
+ # Packed records never have backpointers (?).
+ # If there is no data, write a z64 backpointer.
+ # This is a George Bailey event.
+ self._tfile.write(z64)
+
+ def copyRest(self, ipos):
+ # After the pack time, all data records are copied.
+ # Copy one txn at a time, using copy() for data.
+
+ # Release the commit lock every 20 copies
+ self._lock_counter = 0
+
+ try:
+ while 1:
+ ipos = self.copyOne(ipos)
+ except CorruptedDataError, err:
+ # The last call to copyOne() will raise
+ # CorruptedDataError, because it will attempt to read past
+ # the end of the file. Double-check that the exception
+ # occurred for this reason.
+ self._file.seek(0, 2)
+ endpos = self._file.tell()
+ if endpos != err.pos:
+ raise
+
+ def copyOne(self, ipos):
+ # The call below will raise CorruptedDataError at EOF.
+ th = self._read_txn_header(ipos)
+ self._lock_counter += 1
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_release()
+ pos = self._tfile.tell()
+ self._copier.setTxnPos(pos)
+ self._tfile.write(th.asString())
+ tend = ipos + th.tlen
+ ipos += th.headerlen()
+
+ while ipos < tend:
+ h = self._read_data_header(ipos)
+ ipos += h.recordlen()
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ self._copier.copy(h.oid, h.tid, data, h.version,
+ prev_txn, pos, self._tfile.tell())
+
+ tlen = self._tfile.tell() - pos
+ assert tlen == th.tlen
+ self._tfile.write(p64(tlen))
+ ipos += 8
+
+ self.index.update(self.tindex)
+ self.tindex.clear()
+ self.vindex.update(self.tvindex)
+ self.tvindex.clear()
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_acquire()
+ return ipos
=== ZODB3/ZODB/FileStorage/fsdump.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:31 2003
+++ ZODB3/ZODB/FileStorage/fsdump.py Wed Dec 24 11:01:59 2003
@@ -0,0 +1,155 @@
+from ZODB.FileStorage import FileIterator
+from ZODB.FileStorage.format \
+ import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
+from ZODB.TimeStamp import TimeStamp
+from ZODB.utils import u64
+from ZODB.tests.StorageTestBase import zodb_unpickle
+
+from cPickle import Unpickler
+from cStringIO import StringIO
+import md5
+import struct
+import types
+
+def get_pickle_metadata(data):
+ # ZODB's data records contain two pickles. The first is the class
+ # of the object, the second is the object.
+ if data.startswith('(c'):
+ # Don't actually unpickle a class, because it will attempt to
+ # load the class. Just break open the pickle and get the
+ # module and class from it.
+ modname, classname, rest = data.split('\n', 2)
+ modname = modname[2:]
+ return modname, classname
+ f = StringIO(data)
+ u = Unpickler(f)
+ try:
+ class_info = u.load()
+ except Exception, err:
+ print "Error", err
+ return '', ''
+ if isinstance(class_info, types.TupleType):
+ if isinstance(class_info[0], types.TupleType):
+ modname, classname = class_info[0]
+ else:
+ modname, classname = class_info
+ else:
+ # XXX not sure what to do here
+ modname = repr(class_info)
+ classname = ''
+ return modname, classname
+
+def fsdump(path, file=None, with_offset=1):
+ i = 0
+ iter = FileIterator(path)
+ for trans in iter:
+ if with_offset:
+ print >> file, "Trans #%05d tid=%016x time=%s offset=%d" % \
+ (i, u64(trans.tid), str(TimeStamp(trans.tid)), trans._pos)
+ else:
+ print >> file, "Trans #%05d tid=%016x time=%s" % \
+ (i, u64(trans.tid), str(TimeStamp(trans.tid)))
+ print >> file, "\tstatus=%s user=%s description=%s" % \
+ (`trans.status`, trans.user, trans.description)
+ j = 0
+ for rec in trans:
+ if rec.data is None:
+ fullclass = "undo or abort of object creation"
+ else:
+ modname, classname = get_pickle_metadata(rec.data)
+ dig = md5.new(rec.data).hexdigest()
+ fullclass = "%s.%s" % (modname, classname)
+ # special case for testing purposes
+ if fullclass == "ZODB.tests.MinPO.MinPO":
+ obj = zodb_unpickle(rec.data)
+ fullclass = "%s %s" % (fullclass, obj.value)
+ if rec.version:
+ version = "version=%s " % rec.version
+ else:
+ version = ''
+ if rec.data_txn:
+ # XXX It would be nice to print the transaction number
+ # (i) but it would be too expensive to keep track of.
+ bp = "bp=%016x" % u64(rec.data_txn)
+ else:
+ bp = ""
+ print >> file, " data #%05d oid=%016x %sclass=%s %s" % \
+ (j, u64(rec.oid), version, fullclass, bp)
+ j += 1
+ print >> file
+ i += 1
+ iter.close()
+
+def fmt(p64):
+ # Return a nicely formatted string for a packaged 64-bit value
+ return "%016x" % u64(p64)
+
+class Dumper:
+ """A very verbose dumper for debuggin FileStorage problems."""
+
+ # XXX Should revise this class to use FileStorageFormatter.
+
+ def __init__(self, path, dest=None):
+ self.file = open(path, "rb")
+ self.dest = dest
+
+ def dump(self):
+ fid = self.file.read(4)
+ print >> self.dest, "*" * 60
+ print >> self.dest, "file identifier: %r" % fid
+ while self.dump_txn():
+ pass
+
+ def dump_txn(self):
+ pos = self.file.tell()
+ h = self.file.read(TRANS_HDR_LEN)
+ if not h:
+ return False
+ tid, tlen, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
+ end = pos + tlen
+ print >> self.dest, "=" * 60
+ print >> self.dest, "offset: %d" % pos
+ print >> self.dest, "end pos: %d" % end
+ print >> self.dest, "transaction id: %s" % fmt(tid)
+ print >> self.dest, "trec len: %d" % tlen
+ print >> self.dest, "status: %r" % status
+ user = descr = extra = ""
+ if ul:
+ user = self.file.read(ul)
+ if dl:
+ descr = self.file.read(dl)
+ if el:
+ extra = self.file.read(el)
+ print >> self.dest, "user: %r" % user
+ print >> self.dest, "description: %r" % descr
+ print >> self.dest, "len(extra): %d" % el
+ while self.file.tell() < end:
+ self.dump_data(pos)
+ stlen = self.file.read(8)
+ print >> self.dest, "redundant trec len: %d" % u64(stlen)
+ return 1
+
+ def dump_data(self, tloc):
+ pos = self.file.tell()
+ h = self.file.read(DATA_HDR_LEN)
+ assert len(h) == DATA_HDR_LEN
+ oid, revid, prev, tloc, vlen, dlen = struct.unpack(DATA_HDR, h)
+ print >> self.dest, "-" * 60
+ print >> self.dest, "offset: %d" % pos
+ print >> self.dest, "oid: %s" % fmt(oid)
+ print >> self.dest, "revid: %s" % fmt(revid)
+ print >> self.dest, "previous record offset: %d" % prev
+ print >> self.dest, "transaction offset: %d" % tloc
+ if vlen:
+ pnv = self.file.read(8)
+ sprevdata = self.file.read(8)
+ version = self.file.read(vlen)
+ print >> self.dest, "version: %r" % version
+ print >> self.dest, "non-version data offset: %d" % u64(pnv)
+ print >> self.dest, \
+ "previous version data offset: %d" % u64(sprevdata)
+ print >> self.dest, "len(data): %d" % dlen
+ self.file.read(dlen)
+ if not dlen:
+ sbp = self.file.read(8)
+ print >> self.dest, "backpointer: %d" % u64(sbp)
=== ZODB3/ZODB/FileStorage/format.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:31 2003
+++ ZODB3/ZODB/FileStorage/format.py Wed Dec 24 11:01:59 2003
@@ -0,0 +1,343 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+#
+# File-based ZODB storage
+#
+# Files are arranged as follows.
+#
+# - The first 4 bytes are a file identifier.
+#
+# - The rest of the file consists of a sequence of transaction
+# "records".
+#
+# A transaction record consists of:
+#
+# - 8-byte transaction id, which is also a time stamp.
+#
+# - 8-byte transaction record length - 8.
+#
+# - 1-byte status code
+#
+# - 2-byte length of user name
+#
+# - 2-byte length of description
+#
+# - 2-byte length of extension attributes
+#
+# - user name
+#
+# - description
+#
+# - extension attributes
+#
+# * A sequence of data records
+#
+# - 8-byte redundant transaction length -8
+#
+# A data record consists of
+#
+# - 8-byte oid.
+#
+# - 8-byte tid, which matches the transaction id in the transaction record.
+#
+# - 8-byte previous-record file-position.
+#
+# - 8-byte beginning of transaction record file position.
+#
+# - 2-byte version length
+#
+# - 8-byte data length
+#
+# ? 8-byte position of non-version data
+# (if version length > 0)
+#
+# ? 8-byte position of previous record in this version
+# (if version length > 0)
+#
+# ? version string
+# (if version length > 0)
+#
+# ? data
+# (data length > 0)
+#
+# ? 8-byte position of data record containing data
+# (data length == 0)
+#
+# Note that the lengths and positions are all big-endian.
+# Also, the object ids time stamps are big-endian, so comparisons
+# are meaningful.
+#
+# Version handling
+#
+# There isn't a separate store for versions. Each record has a
+# version field, indicating what version it is in. The records in a
+# version form a linked list. Each record that has a non-empty
+# version string has a pointer to the previous record in the version.
+# Version back pointers are retained *even* when versions are
+# committed or aborted or when transactions are undone.
+#
+# There is a notion of "current" version records, which are the
+# records in a version that are the current records for their
+# respective objects. When a version is comitted, the current records
+# are committed to the destination version. When a version is
+# aborted, the current records are aborted.
+#
+# When committing or aborting, we search backward through the linked
+# list until we find a record for an object that does not have a
+# current record in the version. If we find a record for which the
+# non-version pointer is the same as the previous pointer, then we
+# forget that the corresponding object had a current record in the
+# version. This strategy allows us to avoid searching backward through
+# previously committed or aborted version records.
+#
+# Of course, we ignore records in undone transactions when committing
+# or aborting.
+#
+# Backpointers
+#
+# When we commit or abort a version, we don't copy (or delete)
+# and data. Instead, we write records with back pointers.
+#
+# A version record *never* has a back pointer to a non-version
+# record, because we never abort to a version. A non-version record
+# may have a back pointer to a version record or to a non-version
+# record.
+
+import struct
+
+from ZODB.POSException import POSKeyError
+from ZODB.referencesf import referencesf
+from ZODB.utils import p64, u64, z64, oid_repr, t32
+from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
+
+class CorruptedError(Exception):
+ pass
+
+class CorruptedDataError(CorruptedError):
+
+ def __init__(self, oid=None, buf=None, pos=None):
+ self.oid = oid
+ self.buf = buf
+ self.pos = pos
+
+ def __str__(self):
+ if self.oid:
+ msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
+ self.buf)
+ else:
+ msg = "Error reading unknown oid. Found %r" % self.buf
+ if self.pos:
+ msg += " at %d" % self.pos
+ return msg
+
+# the struct formats for the headers
+TRANS_HDR = ">8sQcHHH"
+DATA_HDR = ">8s8sQQHQ"
+# constants to support various header sizes
+TRANS_HDR_LEN = 23
+DATA_HDR_LEN = 42
+DATA_VERSION_HDR_LEN = 58
+assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
+assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
+
+class FileStorageFormatter(object):
+ """Mixin class that can read and write the low-level format."""
+
+ # subclasses must provide _file
+
+ _metadata_size = 4L
+ _format_version = "21"
+
+ def _read_num(self, pos):
+ """Read an 8-byte number."""
+ self._file.seek(pos)
+ return u64(self._file.read(8))
+
+ def _read_data_header(self, pos, oid=None):
+ """Return a DataHeader object for data record at pos.
+
+ If ois is not None, raise CorruptedDataError if oid passed
+ does not match oid in file.
+
+ If there is version data, reads the version part of the header.
+ If there is no pickle data, reads the back pointer.
+ """
+ self._file.seek(pos)
+ s = self._file.read(DATA_HDR_LEN)
+ if len(s) != DATA_HDR_LEN:
+ raise CorruptedDataError(oid, s, pos)
+ h = DataHeaderFromString(s)
+ if oid is not None and oid != h.oid:
+ raise CorruptedDataError(oid, s, pos)
+ if h.vlen:
+ s = self._file.read(16 + h.vlen)
+ h.parseVersion(s)
+ if not h.plen:
+ h.back = u64(self._file.read(8))
+ return h
+
+ def _write_version_header(self, file, pnv, vprev, version):
+ s = struct.pack(">8s8s", pnv, vprev)
+ file.write(s + version)
+
+ def _read_txn_header(self, pos, tid=None):
+ self._file.seek(pos)
+ s = self._file.read(TRANS_HDR_LEN)
+ if len(s) != TRANS_HDR_LEN:
+ raise CorruptedDataError(tid, s, pos)
+ h = TxnHeaderFromString(s)
+ if tid is not None and tid != h.tid:
+ raise CorruptedDataError(tid, s, pos)
+ h.user = self._file.read(h.ulen)
+ h.descr = self._file.read(h.dlen)
+ h.ext = self._file.read(h.elen)
+ return h
+
+ def _loadBack_impl(self, oid, back, fail=True):
+ # shared implementation used by various _loadBack methods
+ #
+ # If the backpointer ultimately resolves to 0:
+ # If fail is True, raise KeyError for zero backpointer.
+ # If fail is False, return the empty data from the record
+ # with no backpointer.
+ while 1:
+ if not back:
+ # If backpointer is 0, object does not currently exist.
+ raise POSKeyError(oid)
+ h = self._read_data_header(back)
+ if h.plen:
+ return self._file.read(h.plen), h.tid, back, h.tloc
+ if h.back == 0 and not fail:
+ return None, h.tid, back, h.tloc
+ back = h.back
+
+ def _loadBackTxn(self, oid, back, fail=True):
+ """Return data and txn id for backpointer."""
+ return self._loadBack_impl(oid, back, fail)[:2]
+
+ def _loadBackPOS(self, oid, back):
+ return self._loadBack_impl(oid, back)[2]
+
+ def getTxnFromData(self, oid, back):
+ """Return transaction id for data at back."""
+ h = self._read_data_header(back, oid)
+ return h.tid
+
+ def fail(self, pos, msg, *args):
+ s = ("%s:%s:" + msg) % ((self._name, pos) + args)
+ LOG("FS pack", ERROR, s)
+ raise CorruptedError(s)
+
+ def checkTxn(self, th, pos):
+ if th.tid <= self.ltid:
+ self.fail(pos, "time-stamp reduction: %s <= %s",
+ oid_repr(th.tid), oid_repr(self.ltid))
+ self.ltid = th.tid
+ if th.status == "c":
+ self.fail(pos, "transaction with checkpoint flag set")
+ if not th.status in " pu": # recognize " ", "p", and "u" as valid
+ self.fail(pos, "invalid transaction status: %r", th.status)
+ if th.tlen < th.headerlen():
+ self.fail(pos, "invalid transaction header: "
+ "txnlen (%d) < headerlen(%d)", th.tlen, th.headerlen())
+
+ def checkData(self, th, tpos, dh, pos):
+ if dh.tloc != tpos:
+ self.fail(pos, "data record does not point to transaction header"
+ ": %d != %d", dh.tloc, tpos)
+ if pos + dh.recordlen() > tpos + th.tlen:
+ self.fail(pos, "data record size exceeds transaction size: "
+ "%d > %d", pos + dh.recordlen(), tpos + th.tlen)
+ if dh.prev >= pos:
+ self.fail(pos, "invalid previous pointer: %d", dh.prev)
+ if dh.back:
+ if dh.back >= pos:
+ self.fail(pos, "invalid back pointer: %d", dh.prev)
+ if dh.plen:
+ self.fail(pos, "data record has back pointer and data")
+
+def DataHeaderFromString(s):
+ return DataHeader(*struct.unpack(DATA_HDR, s))
+
+class DataHeader(object):
+ """Header for a data record."""
+
+ __slots__ = (
+ "oid", "tid", "prev", "tloc", "vlen", "plen", "back",
+ # These three attributes are only defined when vlen > 0
+ "pnv", "vprev", "version")
+
+ def __init__(self, oid, tid, prev, tloc, vlen, plen):
+ self.back = 0 # default
+ self.version = "" # default
+ self.oid = oid
+ self.tid = tid
+ self.prev = prev
+ self.tloc = tloc
+ self.vlen = vlen
+ self.plen = plen
+
+ def asString(self):
+ s = struct.pack(DATA_HDR, self.oid, self.tid, self.prev,
+ self.tloc, self.vlen, self.plen)
+ if self.version:
+ v = struct.pack(">QQ", self.pnv, self.vprev)
+ return s + v + self.version
+ else:
+ return s
+
+ def setVersion(self, version, pnv, vprev):
+ self.version = version
+ self.vlen = len(version)
+ self.pnv = pnv
+ self.vprev = vprev
+
+ def parseVersion(self, buf):
+ pnv, vprev = struct.unpack(">QQ", buf[:16])
+ self.pnv = pnv
+ self.vprev = vprev
+ self.version = buf[16:]
+
+ def recordlen(self):
+ rlen = DATA_HDR_LEN + (self.plen or 8)
+ if self.version:
+ rlen += 16 + self.vlen
+ return rlen
+
+def TxnHeaderFromString(s):
+ return TxnHeader(*struct.unpack(TRANS_HDR, s))
+
+class TxnHeader(object):
+ """Header for a transaction record."""
+
+ __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
+ "ulen", "dlen", "elen")
+
+ def __init__(self, tid, tlen, status, ulen, dlen, elen):
+ self.tid = tid
+ self.tlen = tlen
+ self.status = status
+ self.ulen = ulen
+ self.dlen = dlen
+ self.elen = elen
+ if elen < 0:
+ self.elen = t32 - elen
+
+ def asString(self):
+ s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
+ self.ulen, self.dlen, self.elen)
+ return "".join([s, self.user, self.descr, self.ext])
+
+ def headerlen(self):
+ return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
=== ZODB3/ZODB/FileStorage/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 24 11:02:31 2003
+++ ZODB3/ZODB/FileStorage/__init__.py Wed Dec 24 11:01:59 2003
@@ -0,0 +1,4 @@
+# this is a package
+
+from ZODB.FileStorage.FileStorage \
+ import FileStorage, RecordIterator, FileIterator, packed_version
=== ZODB3/ZODB/FileStorage/FileStorage.py 1.1 => 1.2 === (1627/2027 lines abridged)
--- /dev/null Wed Dec 24 11:02:31 2003
+++ ZODB3/ZODB/FileStorage/FileStorage.py Wed Dec 24 11:01:59 2003
@@ -0,0 +1,2024 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Storage implementation using a log written to a single file.
+
+$Revision$
+"""
+
+import base64
+from cPickle import Pickler, Unpickler, loads
+import errno
+import os
+import struct
+import sys
+import time
+from types import StringType, DictType
+from struct import pack, unpack
+
+# Not all platforms have fsync
+fsync = getattr(os, "fsync", None)
+
+from ZODB import BaseStorage, ConflictResolution, POSException
+from ZODB.POSException \
+ import UndoError, POSKeyError, MultipleUndoErrors, VersionLockError
+from persistent.TimeStamp import TimeStamp
+from ZODB.lock_file import LockFile
+from ZODB.utils import p64, u64, cp, z64
+from ZODB.FileStorage.fspack import FileStoragePacker
+from ZODB.FileStorage.format \
+ import FileStorageFormatter, DataHeader, TxnHeader, DATA_HDR, \
+ DATA_HDR_LEN, TRANS_HDR, TRANS_HDR_LEN, CorruptedDataError, \
+ DATA_VERSION_HDR_LEN
+
+try:
+ from ZODB.fsIndex import fsIndex
+except ImportError:
+ def fsIndex():
+ return {}
+
+from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
+
+t32 = 1L << 32
+
+packed_version = "FS21"
+
+def blather(message, *data):
+ LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+ message % data))
+
+def warn(message, *data):
+ LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
+ message % data))
+
+def error(message, *data, **kwargs):
+ LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
+ message % data), **kwargs)
+
+def nearPanic(message, *data):
+ LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
+ message % data))
+
+def panic(message, *data):
+ message = message % data
+ LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version, message))
+ raise CorruptedTransactionError(message)
+
+class FileStorageError(POSException.StorageError):
+ pass
+
+class PackError(FileStorageError):
+ pass
+
+class FileStorageFormatError(FileStorageError):
+ """Invalid file format
+
+ The format of the given file is not valid.
+ """
+
+class CorruptedFileStorageError(FileStorageError,
+ POSException.StorageSystemError):
+ """Corrupted file storage."""
+
+class CorruptedTransactionError(CorruptedFileStorageError):
+ pass
+
+class FileStorageQuotaError(FileStorageError,
+ POSException.StorageSystemError):
+ """File storage quota exceeded."""
+
+class TempFormatter(FileStorageFormatter):
+ """Helper class used to read formatted FileStorage data."""
+
+ def __init__(self, afile):
+ self._file = afile
+
+class FileStorage(BaseStorage.BaseStorage,
+ ConflictResolution.ConflictResolvingStorage,
+ FileStorageFormatter):
+
+ # default pack time is 0
+ _packt = z64
+
+ _records_before_save = 10000
+
+ def __init__(self, file_name, create=False, read_only=False, stop=None,
+ quota=None):
+
+ if read_only:
+ self._is_read_only = 1
+ if create:
+ raise ValueError("can't create a read-only file")
+ elif stop is not None:
+ raise ValueError("time-travel only supported in read-only mode")
+
+ if stop is None:
+ stop='\377'*8
+
+ # Lock the database and set up the temp file.
+ if not read_only:
+ # Create the lock file
+ self._lock_file = LockFile(file_name + '.lock')
+ self._tfile = open(file_name + '.tmp', 'w+b')
+ self._tfmt = TempFormatter(self._tfile)
+ else:
+ self._tfile = None
+
+ self._file_name = file_name
+
+ BaseStorage.BaseStorage.__init__(self, file_name)
+
+ (index, vindex, tindex, tvindex,
+ oid2tid, toid2tid, toid2tid_delete) = self._newIndexes()
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2tid, toid2tid, toid2tid_delete)
+
+ # Now open the file
+
+ self._file = None
+ if not create:
+ try:
+ self._file = open(file_name, read_only and 'rb' or 'r+b')
+ except IOError, exc:
+ if exc.errno == errno.EFBIG:
+ # The file is too big to open. Fail visibly.
+ raise
+ if exc.errno == errno.ENOENT:
+ # The file doesn't exist. Create it.
+ create = 1
+ # If something else went wrong, it's hard to guess
+ # what the problem was. If the file does not exist,
+ # create it. Otherwise, fail.
+ if os.path.exists(file_name):
+ raise
+ else:
+ create = 1
+
+ if self._file is None and create:
+ if os.path.exists(file_name):
+ os.remove(file_name)
+ self._file = open(file_name, 'w+b')
+ self._file.write(packed_version)
+
+ r = self._restore_index()
+ if r is not None:
+ self._used_index = 1 # Marker for testing
+ index, vindex, start, maxoid, ltid = r
+
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2tid, toid2tid, toid2tid_delete)
+ self._pos, self._oid, tid = read_index(
+ self._file, file_name, index, vindex, tindex, stop,
+ ltid=ltid, start=start, maxoid=maxoid,
+ read_only=read_only,
+ )
+ else:
+ self._used_index = 0 # Marker for testing
+ self._pos, self._oid, tid = read_index(
+ self._file, file_name, index, vindex, tindex, stop,
+ read_only=read_only,
+ )
+ self._save_index()
+
+ self._records_before_save = max(self._records_before_save,
+ len(self._index))
+ self._ltid = tid
+
+ # self._pos should always point just past the last
[-=- -=- -=- 1627 lines omitted -=- -=- -=-]
+ # now, mimic a read on a closed file.
+ raise IOError, 'iterator is closed'
+
+ pos = self._pos
+ while 1:
+ # Read the transaction record
+ try:
+ h = self._read_txn_header(pos)
+ except CorruptedDataError, err:
+ # If buf is empty, we've reached EOF.
+ if not err.buf:
+ break
+ raise
+
+ if h.tid <= self._ltid:
+ warn("%s time-stamp reduction at %s", self._file.name, pos)
+ self._ltid = h.tid
+
+ if self._stop is not None and h.tid > self._stop:
+ raise IndexError, index
+
+ if h.status == "c":
+ # Assume we've hit the last, in-progress transaction
+ raise IndexError, index
+
+ if pos + h.tlen + 8 > self._file_size:
+ # Hm, the data were truncated or the checkpoint flag wasn't
+ # cleared. They may also be corrupted,
+ # in which case, we don't want to totally lose the data.
+ warn("%s truncated, possibly due to damaged records at %s",
+ self._file.name, pos)
+ break
+
+ if h.status not in " up":
+ warn('%s has invalid status, %s, at %s', self._file.name,
+ h.status, pos)
+
+ if h.tlen < h.headerlen():
+ # We're in trouble. Find out if this is bad data in
+ # the middle of the file, or just a turd that Win 9x
+ # dropped at the end when the system crashed. Skip to
+ # the end and read what should be the transaction
+ # length of the last transaction.
+ self._file.seek(-8, 2)
+ rtl = u64(self._file.read(8))
+ # Now check to see if the redundant transaction length is
+ # reasonable:
+ if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
+ nearPanic("%s has invalid transaction header at %s",
+ self._file.name, pos)
+ warn("It appears that there is invalid data at the end of "
+ "the file, possibly due to a system crash. %s "
+ "truncated to recover from bad data at end."
+ % self._file.name)
+ break
+ else:
+ warn("%s has invalid transaction header at %s",
+ self._file.name, pos)
+ break
+
+ tpos = pos
+ tend = tpos + h.tlen
+
+ if h.status != "u":
+ pos = tpos + h.headerlen()
+ user = self._file.read(h.ulen)
+ description = self._file.read(h.dlen)
+ e = {}
+ if h.elen:
+ try:
+ e = loads(self._file.read(h.elen))
+ except:
+ pass
+
+ result = RecordIterator(h.tid, h.status, user, description,
+ e, pos, tend, self._file, tpos)
+
+ # Read the (intentionally redundant) transaction length
+ self._file.seek(tend)
+ rtl = u64(self._file.read(8))
+ if rtl != h.tlen:
+ warn("%s redundant transaction length check failed at %s",
+ self._file.name, tend)
+ break
+ self._pos = tend + 8
+
+ return result
+
+ raise IndexError, index
+
+class RecordIterator(Iterator, BaseStorage.TransactionRecord,
+ FileStorageFormatter):
+ """Iterate over the transactions in a FileStorage file."""
+ def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
+ self.tid = tid
+ self.status = status
+ self.user = user
+ self.description = desc
+ self._extension = ext
+ self._pos = pos
+ self._tend = tend
+ self._file = file
+ self._tpos = tpos
+
+ def next(self, index=0):
+ pos = self._pos
+ while pos < self._tend:
+ # Read the data records for this transaction
+ h = self._read_data_header(pos)
+ dlen = h.recordlen()
+
+ if pos + dlen > self._tend or h.tloc != self._tpos:
+ warn("%s data record exceeds transaction record at %s",
+ file.name, pos)
+ break
+
+ self._pos = pos + dlen
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ if h.back == 0:
+ # If the backpointer is 0, then this transaction
+ # undoes the object creation. It either aborts
+ # the version that created the object or undid the
+ # transaction that created it. Return None
+ # instead of a pickle to indicate this.
+ data = None
+ else:
+ data, tid = self._loadBackTxn(h.oid, h.back, False)
+ # XXX looks like this only goes one link back, should
+ # it go to the original data like BDBFullStorage?
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ r = Record(h.oid, h.tid, h.version, data, prev_txn)
+
+ return r
+
+ raise IndexError, index
+
+class Record(BaseStorage.DataRecord):
+ """An abstract database record."""
+ def __init__(self, *args):
+ self.oid, self.tid, self.version, self.data, self.data_txn = args
+
+class UndoSearch:
+
+ def __init__(self, file, pos, packt, first, last, filter=None):
+ self.file = file
+ self.pos = pos
+ self.packt = packt
+ self.first = first
+ self.last = last
+ self.filter = filter
+ self.i = 0
+ self.results = []
+ self.stop = 0
+
+ def finished(self):
+ """Return True if UndoSearch has found enough records."""
+ # BAW: Why 39 please? This makes no sense (see also below).
+ return self.i >= self.last or self.pos < 39 or self.stop
+
+ def search(self):
+ """Search for another record."""
+ dict = self._readnext()
+ if dict is not None and (self.filter is None or self.filter(dict)):
+ if self.i >= self.first:
+ self.results.append(dict)
+ self.i += 1
+
+ def _readnext(self):
+ """Read the next record from the storage."""
+ self.file.seek(self.pos - 8)
+ self.pos -= u64(self.file.read(8)) + 8
+ self.file.seek(self.pos)
+ h = self.file.read(TRANS_HDR_LEN)
+ tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
+ if tid < self.packt or status == 'p':
+ self.stop = 1
+ return None
+ if status != ' ':
+ return None
+ d = u = ''
+ if ul:
+ u = self.file.read(ul)
+ if dl:
+ d = self.file.read(dl)
+ e = {}
+ if el:
+ try:
+ e = loads(self.file.read(el))
+ except:
+ pass
+ d = {'id': base64.encodestring(tid).rstrip(),
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': u,
+ 'description': d}
+ d.update(e)
+ return d
More information about the Zodb-checkins
mailing list