[Zodb-checkins] CVS: Zope3/src/zodb/storage/file - pack.py:1.2
main.py:1.2 index.py:1.2 format.py:1.2 errors.py:1.2
dump.py:1.2 copy.py:1.2 __init__.py:1.2
Jeremy Hylton
jeremy at zope.com
Tue Apr 22 12:23:46 EDT 2003
Update of /cvs-repository/Zope3/src/zodb/storage/file
In directory cvs.zope.org:/tmp/cvs-serv13741/file
Added Files:
pack.py main.py index.py format.py errors.py dump.py copy.py
__init__.py
Log Message:
Merge the jeremy-new-pack-branch to the trunk.
The primary change is a completely new implementation of file storage pack.
=== Zope3/src/zodb/storage/file/pack.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/pack.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,457 @@
+"""FileStorage helper to find all reachable object as of a certain time.
+
+A storage contains an ordered set of object revisions. When a storage
+is packed, object revisions that are not reachable as of the pack time
+are deleted. The notion of reachability is complicated by
+backpointers -- object revisions that point to earlier revisions of
+the same object.
+
+An object revisions is reachable at a certain time if it is reachable
+from the revision of the root at that time or if it is reachable from
+a backpointer after that time.
+"""
+
+import logging
+import os
+import stat
+
+from zodb.interfaces import ZERO, _fmt_oid
+from zodb.utils import p64, u64
+from zodb.storage.base import splitrefs
+from zodb.storage.file.copy import DataCopier
+from zodb.storage.file.errors import CorruptedError
+from zodb.storage.file.format import FileStorageFormatter
+from zodb.storage.file.index import fsIndex
+
+logger = logging.getLogger("zodb.storage.file")
+
+class GC(FileStorageFormatter):
+
+ def __init__(self, file, eof, packtime):
+ self._file = file
+ self._name = file.name
+ self.eof = eof
+ self.packtime = packtime
+ # packpos: position of first txn header after pack time
+ self.packpos = None
+ self.oid2curpos = {} # maps oid to current data record position
+ self.oid2verpos = {} # maps oid to current version data
+
+ # The set of reachable revisions of each object.
+ #
+ # This set as managed using two data structures. The first is
+ # an fsIndex mapping oids to one data record pos. Since only
+ # a few objects will have more than one revision, we use this
+ # efficient data structure to handle the common case. The
+ # second is a dictionary mapping objects to lists of
+ # positions; it is used to handle the same number of objects
+ # for which we must keep multiple revisions.
+
+ self.reachable = fsIndex()
+ self.reach_ex = {}
+
+ # keep ltid for consistency checks during initial scan
+ self.ltid = ZERO
+
+ def isReachable(self, oid, pos):
+ """Return True if revision of `oid` at `pos` is reachable."""
+
+ rpos = self.reachable.get(oid)
+ if rpos is None:
+ return False
+ if rpos == pos:
+ return True
+ return pos in self.reach_ex.get(oid, [])
+
+ def findReachable(self):
+ self.buildPackIndex()
+ self.findReachableAtPacktime([ZERO])
+ self.findReachableFromFuture()
+
+ def buildPackIndex(self):
+ pos = 1024
+ while pos < self.eof:
+ th = self._read_txn_header(pos)
+ if th.tid > self.packtime:
+ break
+ self.checkTxn(th, pos)
+
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+ if dh.version:
+ self.oid2verpos[dh.oid] = pos
+ else:
+ self.oid2curpos[dh.oid] = pos
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ self.packpos = pos
+
+ def findReachableAtPacktime(self, roots):
+ """Mark all objects reachable from the oids in roots as reachable."""
+ todo = list(roots)
+ while todo:
+ oid = todo.pop()
+ if oid in self.reachable:
+ continue
+
+ L = []
+
+ pos = self.oid2curpos.get(oid)
+ if pos is not None:
+ L.append(pos)
+ todo.extend(self.findrefs(pos))
+
+ pos = self.oid2verpos.get(oid)
+ if pos is not None:
+ L.append(pos)
+ todo.extend(self.findrefs(pos))
+
+ if not L:
+ continue
+
+ pos = L.pop()
+ self.reachable[oid] = pos
+ if L:
+ self.reach_ex[oid] = L
+
+ def findReachableFromFuture(self):
+ # In this pass, the roots are positions of object revisions.
+ # We add a pos to extra_roots when there is a backpointer to a
+ # revision that was not current at the packtime. The
+ # non-current revision could refer to objects that were
+ # otherwise unreachable at the packtime.
+ extra_roots = []
+
+ pos = self.packpos
+ while pos < self.eof:
+ th = self._read_txn_header(pos)
+ self.checkTxn(th, pos)
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+
+ if dh.back and dh.back < self.packpos:
+ if dh.oid in self.reachable:
+ L = self.reach_ex.setdefault(dh.oid, [])
+ if dh.back not in L:
+ L.append(dh.back)
+ extra_roots.append(dh.back)
+ else:
+ self.reachable[dh.oid] = dh.back
+
+ if dh.version:
+ if dh.oid in self.reachable:
+ L = self.reach_ex.setdefault(dh.oid, [])
+ if dh.pnv not in L:
+ L.append(dh.pnv)
+ extra_roots.append(dh.pnv)
+ else:
+ self.reachable[dh.oid] = dh.back
+
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ for pos in extra_roots:
+ refs = self.findrefs(pos)
+ self.findReachableAtPacktime(refs)
+
+ def findrefs(self, pos):
+ """Return a list of oids referenced as of packtime."""
+ dh = self._read_data_header(pos)
+ # Chase backpointers until we get to the record with the refs
+ while dh.back:
+ dh = self._read_data_header(dh.back)
+ return splitrefs(self._file.read(dh.nrefs * 8))
+
+class PackCopier(DataCopier):
+
+ # PackCopier has to cope with _file and _tfile being the
+ # same file. The copy() implementation is written assuming
+ # that they are different, so that using one object doesn't
+ # mess up the file pointer for the other object.
+
+ # PackCopier overrides _resolve_backpointer() and _restore_pnv()
+ # to guarantee that they keep the file pointer for _tfile in
+ # the right place.
+
+ def __init__(self, f, index, vindex, tindex, tvindex):
+ self._file = f
+ self._tfile = f
+ self._index = index
+ self._vindex = vindex
+ self._tindex = tindex
+ self._tvindex = tvindex
+ self._pos = None
+
+ def setTxnPos(self, pos):
+ self._pos = pos
+
+ def _resolve_backpointer(self, prev_txn, oid, data):
+ pos = self._tfile.tell()
+ try:
+ return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
+ finally:
+ self._tfile.seek(pos)
+
+ def _restore_pnv(self, oid, prev, version, bp):
+ pos = self._tfile.tell()
+ try:
+ return DataCopier._restore_pnv(self, oid, prev, version, bp)
+ finally:
+ self._tfile.seek(pos)
+
+class FileStoragePacker(FileStorageFormatter):
+
+ def __init__(self, path, stop, la, lr, cla, clr):
+ self._name = path
+ self._file = open(path, "rb")
+ self._stop = stop
+ self._packt = None
+ self.locked = False
+ self.file_end = os.stat(self._file.name)[stat.ST_SIZE]
+
+ self.gc = GC(self._file, self.file_end, self._stop)
+
+ # The packer needs to acquire the parent's commit lock
+ # during the copying stage, so the two sets of lock acquire
+ # and release methods are passed to the constructor.
+ self._lock_acquire = la
+ self._lock_release = lr
+ self._commit_lock_acquire = cla
+ self._commit_lock_release = clr
+
+ # The packer will use several indexes.
+ # index: oid -> pos
+ # vindex: version -> pos of XXX
+ # tindex: oid -> pos, for current txn
+ # tvindex: version -> pos of XXX, for current txn
+
+ self.index = fsIndex()
+ self.vindex = {}
+ self.tindex = {}
+ self.tvindex = {}
+
+ # Index for non-version data. This is a temporary structure
+ # to reduce I/O during packing
+ self.nvindex = fsIndex()
+
+ def pack(self):
+ # Pack copies all data reachable at the pack time or later.
+ #
+ # Copying occurs in two phases. In the first phase, txns
+ # before the pack time are copied if the contain any reachable
+ # data. In the second phase, all txns after the pack time
+ # are copied.
+ #
+ # Txn and data records contain pointers to previous records.
+ # Because these pointers are stored as file offsets, they
+ # must be updated when we copy data.
+
+ # XXX Need to add sanity checking to pack
+
+ self.gc.findReachable()
+
+ # Setup the destination file and copy the metadata.
+ # XXX rename from _tfile to something clearer
+ self._tfile = open(self._name + ".pack", "w+b")
+ self._file.seek(0)
+ self._tfile.write(self._file.read(self._metadata_size))
+
+ self._copier = PackCopier(self._tfile, self.index, self.vindex,
+ self.tindex, self.tvindex)
+
+ ipos, opos = self.copyToPacktime()
+ assert ipos == self.gc.packpos
+ if ipos == opos:
+ # pack didn't free any data. there's no point in continuing.
+ self._tfile.close()
+ os.remove(self._name + ".pack")
+ return None
+ if ipos < self.file_end:
+ self.copyRest(ipos)
+
+ # OK, we've copied everything. Now we need to wrap things up.
+ pos = self._tfile.tell()
+ self._tfile.flush()
+ self._tfile.close()
+ self._file.close()
+
+ return pos
+
+ def copyToPacktime(self):
+ offset = 0L # the amount of space freed by packing
+ pos = self._metadata_size
+ new_pos = pos
+
+ while pos < self.gc.packpos:
+ th = self._read_txn_header(pos)
+ new_tpos, pos = self.copyDataRecords(pos, th)
+
+ if new_tpos:
+ new_pos = self._tfile.tell() + 8
+ tlen = new_pos - new_tpos - 8
+ # Update the transaction length
+ self._tfile.seek(new_tpos + 8)
+ self._tfile.write(p64(tlen))
+ self._tfile.seek(new_pos - 8)
+ self._tfile.write(p64(tlen))
+
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ u64(s), th.tlen)
+ pos += 8
+
+ return pos, new_pos
+
+ def fetchBackpointer(self, oid, back):
+ """Return data and refs backpointer `back` to object `oid.
+
+ If `back` is 0 or ultimately resolves to 0, return None
+ and None. In this case, the transaction undoes the object
+ creation.
+ """
+ if back == 0:
+ return None, None
+ data, refs, serial, tid = self._loadBackTxn(oid, back, False)
+ return data, refs
+
+ def copyDataRecords(self, pos, th):
+ """Copy any current data records between pos and tend.
+
+ Returns position of txn header in output file and position
+ of next record in the input file.
+
+ If any data records are copied, also write txn header (th).
+ """
+ copy = False
+ new_tpos = 0
+ tend = pos + th.tlen
+ pos += th.headerlen()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ if not self.gc.isReachable(h.oid, pos):
+ pos += h.recordlen()
+ continue
+ pos += h.recordlen()
+
+ # If we are going to copy any data, we need to copy
+ # the transaction header. Note that we will need to
+ # patch up the transaction length when we are done.
+ if not copy:
+ th.status = "p"
+ s = th.asString()
+ new_tpos = self._tfile.tell()
+ self._tfile.write(s)
+ new_pos = new_tpos + len(s)
+ copy = True
+
+ if h.plen:
+ refs = self._file.read(8 * h.nrefs)
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data, refs = self.fetchBackpointer(h.oid, h.back)
+ if refs is not None:
+ refs = "".join(refs)
+
+ self.writePackedDataRecord(h, data, refs, new_tpos)
+ new_pos = self._tfile.tell()
+
+ return new_tpos, pos
+
+ def writePackedDataRecord(self, h, data, refs, new_tpos):
+ # Update the header to reflect current information, then write
+ # it to the output file.
+ if data is None:
+ data = ""
+ if refs is None:
+ refs = ""
+ h.prev = 0
+ h.back = 0
+ h.plen = len(data)
+ h.nrefs = len(refs) / 8
+ h.tloc = new_tpos
+ pos = self._tfile.tell()
+ if h.version:
+ h.pnv = self.index.get(h.oid, 0)
+ h.vprev = self.vindex.get(h.version, 0)
+ self.vindex[h.version] = pos
+ self.index[h.oid] = pos
+ if h.version:
+ self.vindex[h.version] = pos
+ self._tfile.write(h.asString())
+ self._tfile.write(refs)
+ self._tfile.write(data)
+ if not data:
+ # Packed records never have backpointers (?).
+ # If there is no data, write a ZERO backpointer.
+ # This is a George Bailey event.
+ self._tfile.write(ZERO)
+
+ def copyRest(self, ipos):
+ # After the pack time, all data records are copied.
+ # Copy one txn at a time, using copy() for data.
+
+ while ipos < self.file_end:
+ th = self._read_txn_header(ipos)
+ pos = self._tfile.tell()
+ self._copier.setTxnPos(pos)
+ self._tfile.write(th.asString())
+ tend = ipos + th.tlen
+ ipos += th.headerlen()
+
+ while ipos < tend:
+ h = self._read_data_header(ipos)
+ ipos += h.recordlen()
+ if h.nrefs:
+ refs = splitrefs(self._file.read(h.nrefs * 8))
+ else:
+ refs = []
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data, refs = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ self._copier.copy(h.oid, h.serial, data, refs, h.version,
+ prev_txn, pos, self._tfile.tell())
+
+ tlen = self._tfile.tell() - pos
+ assert tlen == th.tlen
+ self._tfile.write(p64(tlen))
+ ipos += 8
+
+ self.index.update(self.tindex)
+ self.tindex.clear()
+ self.vindex.update(self.tvindex)
+ self.tvindex.clear()
+
=== Zope3/src/zodb/storage/file/main.py 1.1 => 1.2 === (1142/1242 lines abridged)
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/main.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,1239 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""File-based ZODB storage
+
+$Id$
+"""
+
+from __future__ import generators
+
+import os
+import sys
+import time
+import errno
+import base64
+import struct
+import logging
+from struct import pack, unpack
+from cPickle import Pickler, Unpickler, loads
+
+try:
+ from posix import fsync
+except:
+ fsync = None
+
+import zodb.db
+from zodb.storage.base import BaseStorage, splitrefs
+from zodb import conflict
+from zodb.interfaces import *
+from zodb.timestamp import TimeStamp, newTimeStamp, timeStampFromTime
+from zodb.lockfile import LockFile
+from zodb.utils import p64, u64, cp
+from zodb.storage.file.index import fsIndex
+from zodb.storage.interfaces import *
+
+from zodb.storage.file.copy import DataCopier
+from zodb.storage.file.errors import *
[-=- -=- -=- 1142 lines omitted -=- -=- -=-]
+ self.first = first
+ self.last = last
+ self.filter = filter
+ self.i = 0
+ self.results = []
+ self.stop = 0
+
+ def finished(self):
+ """Return True if UndoSearch has found enough records."""
+ # The first txn record is at 1024, so pos must be >= 1024
+ return self.i >= self.last or self.pos < 1024 or self.stop
+
+ def search(self):
+ """Search for another record."""
+ dict = self._readnext()
+ if dict is None:
+ return
+ if dict is not None and (self.filter is None or self.filter(dict)):
+ if self.i >= self.first:
+ self.results.append(dict)
+ self.i += 1
+
+ def _readnext(self):
+ """Read the next record from the storage."""
+ self._file.seek(self.pos - 8)
+ self.pos -= u64(self._file.read(8)) + 8
+ if self.pos < 1024:
+ return None
+ h = self._read_txn_header(self.pos)
+ if h.tid < self.packt or h.status == 'p':
+ self.stop = 1
+ return None
+ assert h.status == " "
+ d = {'id': base64.encodestring(h.tid).rstrip(),
+ 'time': TimeStamp(h.tid).timeTime(),
+ 'user_name': h.user,
+ 'description': h.descr}
+ if h.ext:
+ ext = loads(h.ext)
+ d.update(ext)
+ return d
+
+def cleanup(filename):
+ """Remove all FileStorage related files."""
+ for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
+ try:
+ os.remove(filename + ext)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
=== Zope3/src/zodb/storage/file/index.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/index.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,128 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Implement an OID to File-position (long integer) mapping."""
+
+# To save space, we do two things:
+#
+# 1. We split the keys (OIDS) into 6-byte prefixes and 2-byte suffixes.
+# We use the prefixes as keys in a mapping from prefix to mappings
+# of suffix to data:
+#
+# data is {prefix -> {suffix -> data}}
+#
+# 2. We limit the data size to 48 bits. This should allow databases
+# as large as 256 terabytes.
+#
+# Mostof the space is consumed by items in the mappings from 2-byte
+# suffix to 6-byte data. This should reduce the overall memory usage to
+# 8-16 bytes per OID.
+#
+# Since the mapping from suffix to data contains at most 256 entries,
+# we use a BTree bucket instead of a full BTree to store the results.
+#
+# We use p64 to convert integers to 8-byte strings and lop off the two
+# high-order bytes when saving. On loading data, we add the leading
+# bytes back before using u64 to convert the data back to (long)
+# integers.
+
+from __future__ import generators
+import struct
+
+from zodb.btrees._fsBTree import fsBucket
+
+# convert between numbers and six-byte strings
+
+def num2str(n):
+ return struct.pack(">Q", n)[2:]
+
+def str2num(s):
+ return struct.unpack(">Q", "\000\000" + s)[0]
+
+class fsIndex:
+
+ def __init__(self):
+ self._data = {}
+
+ def __getitem__(self, key):
+ return str2num(self._data[key[:6]][key[6:]])
+
+ def get(self, key, default=None):
+ tree = self._data.get(key[:6], default)
+ if tree is default:
+ return default
+ v = tree.get(key[6:], default)
+ if v is default:
+ return default
+ return str2num(v)
+
+ def __setitem__(self, key, value):
+ value = num2str(value)
+ treekey = key[:6]
+ tree = self._data.get(treekey)
+ if tree is None:
+ tree = fsBucket()
+ self._data[treekey] = tree
+ tree[key[6:]] = value
+
+ def __len__(self):
+ r = 0
+ for tree in self._data.values():
+ r += len(tree)
+ return r
+
+ def update(self, mapping):
+ for k, v in mapping.items():
+ self[k] = v
+
+ def has_key(self, key):
+ v=self.get(key, self)
+ return v is not self
+
+ def __contains__(self, key):
+ tree = self._data.get(key[:6])
+ if tree is None:
+ return False
+ v = tree.get(key[6:], None)
+ if v is None:
+ return False
+ return True
+
+ def clear(self):
+ self._data.clear()
+
+ def __iter__(self):
+ for prefix, tree in self._data.items():
+ for suffix in tree:
+ yield prefix + suffix
+
+ def keys(self):
+ r = []
+ for prefix, tree in self._data.items():
+ for suffix in tree.keys():
+ r.append(prefix + suffix)
+ return r
+
+ def items(self):
+ r = []
+ for prefix, tree in self._data.items():
+ for suffix, v in tree.items():
+ r.append(((prefix + suffix), str2num(v)))
+ return r
+
+ def values(self):
+ r = []
+ for prefix, tree in self._data.items():
+ for v in tree.values():
+ r.append(str2num(v))
+ return r
=== Zope3/src/zodb/storage/file/format.py 1.1 => 1.2 === (458/558 lines abridged)
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/format.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,555 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tools for working with the low-level FileStorage format.
+
+Files are arranged as follows.
+
+ - The first 1024 bytes are a storage metadata section.
+
+ The first two bytes are the characters F and S.
+ The next two bytes are a storage format version id, currently 43.
+ The next four bytes are the database version string.
+
+ The rest of the section is reserved.
+
+A transaction record consists of:
+
+ - 8-byte transaction id, which is also a time stamp.
+
+ - 8-byte transaction record length - 8.
+
+ - 1-byte status code
+
+ - 2-byte length of user name
+
+ - 2-byte length of description
+
+ - 2-byte length of extension attributes
+
+ - user name
+
+ - description
+
+ - extension attributes
+
+ * A sequence of data records
+
[-=- -=- -=- 458 lines omitted -=- -=- -=-]
+ self.tloc, self.vlen, self.nrefs, self.plen)
+ if self.version:
+ v = struct.pack(">QQ", self.pnv, self.vprev)
+ return s + v + self.version
+ else:
+ return s
+
+ def setVersion(self, version, pnv, vprev):
+ self.version = version
+ self.vlen = len(version)
+ self.pnv = pnv
+ self.vprev = vprev
+
+ def parseVersion(self, buf):
+ self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
+ self.version = buf[16:]
+
+ def recordlen(self):
+ rlen = DATA_HDR_LEN + (self.nrefs * 8) + (self.plen or 8)
+ if self.version:
+ rlen += 16 + self.vlen
+ return rlen
+
+class TxnHeader:
+ """Header for a transaction record."""
+
+ __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
+ "ulen", "dlen", "elen")
+
+ def __init__(self, tid, tlen, status, ulen, dlen, elen):
+ self.tid = tid
+ self.tlen = tlen
+ self.status = status
+ self.ulen = ulen
+ self.dlen = dlen
+ self.elen = elen
+
+ def fromString(cls, s):
+ return cls(*struct.unpack(TRANS_HDR, s))
+
+ fromString = classmethod(fromString)
+
+ def asString(self):
+ s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
+ self.ulen, self.dlen, self.elen)
+ return "".join([s, self.user, self.descr, self.ext])
+
+ def headerlen(self):
+ return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
+
=== Zope3/src/zodb/storage/file/errors.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/errors.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,40 @@
+"""FileStorage-specific exceptions."""
+
+from zodb.interfaces import _fmt_oid
+from zodb.storage.interfaces import StorageError, StorageSystemError
+
+class FileStorageError(StorageError):
+ pass
+
+class PackError(FileStorageError):
+ pass
+
+class FileStorageFormatError(FileStorageError):
+ """Invalid file format
+
+ The format of the given file is not valid.
+ """
+
+class CorruptedError(FileStorageError, StorageSystemError):
+ """Corrupted file storage."""
+
+class CorruptedDataError(CorruptedError):
+
+ def __init__(self, oid=None, buf=None, pos=None):
+ self.oid = oid
+ self.buf = buf
+ self.pos = pos
+
+ def __str__(self):
+ if self.oid:
+ msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
+ self.buf)
+ else:
+ msg = "Error reading unknown oid. Found %r" % self.buf
+ if self.pos:
+ msg += " at %d" % self.pos
+ return msg
+
+class FileStorageQuotaError(FileStorageError, StorageSystemError):
+ """File storage quota exceeded."""
+
=== Zope3/src/zodb/storage/file/dump.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/dump.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,108 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A low-level utility to dump the internal FileStorage representation."""
+
+import struct
+from zodb.storage.file.format \
+ import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
+from zodb.utils import u64
+from zodb.storage.base import splitrefs
+from zodb.storage.tests.base import zodb_unpickle
+
+def fmt(p64):
+ # Return a nicely formatted string for a packaged 64-bit value
+ return "%016x" % u64(p64)
+
+def dump(path, dest=None):
+ Dumper(path, dest).dump()
+
+class Dumper:
+ """A very verbose dumper for debugging FileStorage problems."""
+
+ def __init__(self, path, dest=None):
+ self.file = open(path, "rb")
+ self.dest = dest
+
+ def dump(self):
+ fid = self.file.read(1024)
+ print >> self.dest, "*" * 60
+ print >> self.dest, "file identifier: %r" % fid[:4]
+ print >> self.dest, "database version: %r" % fid[4:8]
+ # XXX perhaps verify that the rest of the metadata is nulls?
+ while self.dump_txn():
+ pass
+
+ def dump_txn(self):
+ pos = self.file.tell()
+ h = self.file.read(TRANS_HDR_LEN)
+ if not h:
+ return False
+ tid, tlen, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
+ end = pos + tlen
+ print >> self.dest, "=" * 60
+ print >> self.dest, "offset: %d" % pos
+ print >> self.dest, "end pos: %d" % end
+ print >> self.dest, "transaction id: %s" % fmt(tid)
+ print >> self.dest, "trec len: %d" % tlen
+ print >> self.dest, "status: %r" % status
+ user = descr = extra = ""
+ if ul:
+ user = self.file.read(ul)
+ if dl:
+ descr = self.file.read(dl)
+ if el:
+ extra = self.file.read(el)
+ print >> self.dest, "user: %r" % user
+ print >> self.dest, "description: %r" % descr
+ print >> self.dest, "len(extra): %d" % el
+ while self.file.tell() < end:
+ self.dump_data(pos)
+ tlen2 = u64(self.file.read(8))
+ print >> self.dest, "redundant trec len: %d" % tlen2
+ return True
+
+ def dump_data(self, tloc):
+ pos = self.file.tell()
+ h = self.file.read(DATA_HDR_LEN)
+ assert len(h) == DATA_HDR_LEN
+ oid, revid, prev, tloc, vlen, nrefs, dlen = struct.unpack(DATA_HDR, h)
+ print >> self.dest, "-" * 60
+ print >> self.dest, "offset: %d" % pos
+ print >> self.dest, "oid: %s" % fmt(oid)
+ print >> self.dest, "revid: %s" % fmt(revid)
+ print >> self.dest, "previous record offset: %d" % prev
+ print >> self.dest, "transaction offset: %d" % tloc
+ if vlen:
+ pnv = self.file.read(8)
+ sprevdata = self.file.read(8)
+ version = self.file.read(vlen)
+ print >> self.dest, "version: %r" % version
+ print >> self.dest, "non-version data offset: %d" % u64(pnv)
+ print >> self.dest, \
+ "previous version data offset: %d" % u64(sprevdata)
+ print >> self.dest, 'numrefs:', nrefs
+ for ref in splitrefs(self.file.read(nrefs * 8)):
+ print >> self.dest, '\t%s' % fmt(ref)
+ print >> self.dest, "len(data): %d" % dlen
+ data = self.file.read(dlen)
+ # A debugging feature for use with the test suite.
+ if data.startswith("(czodb.storage.tests.minpo\nMinPO\n"):
+ print >> self.dest, "value: %r" % zodb_unpickle(data).value
+ if not dlen:
+ sbp = self.file.read(8)
+ print >> self.dest, "backpointer: %d" % u64(sbp)
+
+if __name__ == "__main__":
+ import sys
+ Dumper(sys.argv[1]).dump()
=== Zope3/src/zodb/storage/file/copy.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/copy.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1,157 @@
+"""Create copy of a data record."""
+
+from zodb.interfaces import MAXTID, ZERO, UndoError
+from zodb.utils import p64, u64
+from zodb.storage.file.format import FileStorageFormatter, DataHeader
+from zodb.storage.file.format \
+ import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR_LEN, DATA_VERSION_HDR_LEN
+
+class DataCopier(FileStorageFormatter):
+ """Mixin class for copying transactions into a storage.
+
+ The restore() and pack() methods share a need to copy data records
+ and update pointers to data in earlier transaction records. This
+ class provides the shared logic.
+
+ The mixin extends the FileStorageFormatter with a copy() method.
+ It also requires that the concrete class provides the following
+ attributes:
+
+ _file -- file with earlier destination data
+ _tfile -- destination file for copied data
+ _packt -- p64() representation of latest pack time
+ _pos -- file pos of destination transaction
+ _tindex -- maps oid to data record file pos
+ _tvindex -- maps version name to data record file pos
+
+ _tindex and _tvindex are updated by copy().
+
+ The copy() method does not do any locking.
+ """
+
+ def _txn_find(self, tid, stop_at_pack):
+ # _pos always points just past the last transaction
+ pos = self._pos
+ while pos > 1024:
+ self._file.seek(pos - 8)
+ pos = pos - u64(self._file.read(8)) - 8
+ self._file.seek(pos)
+ h = self._file.read(TRANS_HDR_LEN)
+ _tid = h[:8]
+ if _tid == tid:
+ return pos
+ if stop_at_pack:
+ # check the status field of the transaction header
+ # XXX _packt seems to be either ZERO or None
+ if h[16] == 'p' or _tid < self._packt:
+ break
+ raise UndoError(None, "Invalid transaction id")
+
+ def _data_find(self, tpos, oid, data):
+ # Return backpointer to oid in data record for in transaction at tpos.
+ # It should contain a pickle identical to data. Returns 0 on failure.
+ # Must call with lock held.
+ h = self._read_txn_header(tpos)
+ tend = tpos + h.tlen
+ pos = self._file.tell()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ if h.oid == oid:
+ # Read past any references data
+ self._file.read(h.nrefs * 8)
+ # Make sure this looks like the right data record
+ if h.plen == 0:
+ # This is also a backpointer. Gotta trust it.
+ return pos
+ if h.plen != len(data):
+ # The expected data doesn't match what's in the
+ # backpointer. Something is wrong.
+ error("Mismatch between data and backpointer at %d", pos)
+ return 0
+ _data = self._file.read(h.plen)
+ if data != _data:
+ return 0
+ return pos
+ pos += h.recordlen()
+ return 0
+
+ def _restore_pnv(self, oid, prev, version, bp):
+ # Find a valid pnv (previous non-version) pointer for this version.
+
+ # If there is no previous record, there can't be a pnv.
+ if not prev:
+ return None
+
+ pnv = None
+ h = self._read_data_header(prev, oid)
+ # If the previous record is for a version, it must have
+ # a valid pnv.
+ if h.version:
+ return h.pnv
+ elif bp:
+ # XXX Not sure the following is always true:
+ # The previous record is not for this version, yet we
+ # have a backpointer to it. The current record must
+ # be an undo of an abort or commit, so the backpointer
+ # must be to a version record with a pnv.
+ h2 = self._read_data_header(bp, oid)
+ if h2.version:
+ return h2.pnv
+ else:
+ warn("restore could not find previous non-version data "
+ "at %d or %d", prev, bp)
+ return None
+
+ def _resolve_backpointer(self, prev_txn, oid, data):
+ prev_pos = 0
+ if prev_txn is not None:
+ prev_txn_pos = self._txn_find(prev_txn, 0)
+ if prev_txn_pos:
+ prev_pos = self._data_find(prev_txn_pos, oid, data)
+ return prev_pos
+
+ def copy(self, oid, serial, data, refs, version, prev_txn,
+ txnpos, datapos):
+ prev_pos = self._resolve_backpointer(prev_txn, oid, data)
+ old = self._index.get(oid, 0)
+ # Calculate the pos the record will have in the storage.
+ here = datapos
+ # And update the temp file index
+ self._tindex[oid] = here
+ if prev_pos:
+ # If there is a valid prev_pos, don't write data.
+ data = None
+ if data is None:
+ dlen = 0
+ refs = []
+ else:
+ dlen = len(data)
+ # Write the recovery data record
+ h = DataHeader(oid, serial, old, txnpos, len(version), len(refs), dlen)
+ if version:
+ h.version = version
+ pnv = self._restore_pnv(oid, old, version, prev_pos)
+ if pnv is not None:
+ h.pnv = pnv
+ else:
+ h.pnv = old
+ # Link to the last record for this version
+ h.vprev = self._tvindex.get(version, 0)
+ if not h.vprev:
+ h.vprev = self._vindex.get(version, 0)
+ self._tvindex[version] = here
+
+ self._tfile.write(h.asString())
+ self._tfile.write(''.join(refs))
+ # Write the data or a backpointer
+ if data is None:
+ if prev_pos:
+ self._tfile.write(p64(prev_pos))
+ else:
+ # Write a zero backpointer, which indicates an
+ # un-creation transaction.
+ self._tfile.write(ZERO)
+ else:
+ self._tfile.write(data)
+
+
=== Zope3/src/zodb/storage/file/__init__.py 1.1 => 1.2 ===
--- /dev/null Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/__init__.py Tue Apr 22 11:23:12 2003
@@ -0,0 +1 @@
+from zodb.storage.file.main import FileStorage
More information about the Zodb-checkins
mailing list