[Zodb-checkins] CVS: Zope3/src/zodb/storage/file - pack.py:1.2 main.py:1.2 index.py:1.2 format.py:1.2 errors.py:1.2 dump.py:1.2 copy.py:1.2 __init__.py:1.2

Jeremy Hylton jeremy at zope.com
Tue Apr 22 12:23:46 EDT 2003


Update of /cvs-repository/Zope3/src/zodb/storage/file
In directory cvs.zope.org:/tmp/cvs-serv13741/file

Added Files:
	pack.py main.py index.py format.py errors.py dump.py copy.py 
	__init__.py 
Log Message:
Merge the jeremy-new-pack-branch to the trunk.

The primary change is a completely new implementation of file storage pack.


=== Zope3/src/zodb/storage/file/pack.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/pack.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,457 @@
+"""FileStorage helper to find all reachable object as of a certain time.
+
+A storage contains an ordered set of object revisions.  When a storage
+is packed, object revisions that are not reachable as of the pack time
+are deleted.  The notion of reachability is complicated by
+backpointers -- object revisions that point to earlier revisions of
+the same object.
+
+An object revisions is reachable at a certain time if it is reachable
+from the revision of the root at that time or if it is reachable from
+a backpointer after that time.
+"""
+
+import logging
+import os
+import stat
+
+from zodb.interfaces import ZERO, _fmt_oid
+from zodb.utils import p64, u64
+from zodb.storage.base import splitrefs
+from zodb.storage.file.copy import DataCopier
+from zodb.storage.file.errors import CorruptedError
+from zodb.storage.file.format import FileStorageFormatter
+from zodb.storage.file.index import fsIndex
+
+logger = logging.getLogger("zodb.storage.file")
+
+class GC(FileStorageFormatter):
+
+    def __init__(self, file, eof, packtime):
+        self._file = file
+        self._name = file.name
+        self.eof = eof
+        self.packtime = packtime
+        # packpos: position of first txn header after pack time
+        self.packpos = None
+        self.oid2curpos = {} # maps oid to current data record position
+        self.oid2verpos = {} # maps oid to current version data
+
+        # The set of reachable revisions of each object.
+        #
+        # This set as managed using two data structures.  The first is
+        # an fsIndex mapping oids to one data record pos.  Since only
+        # a few objects will have more than one revision, we use this
+        # efficient data structure to handle the common case.  The
+        # second is a dictionary mapping objects to lists of
+        # positions; it is used to handle the same number of objects
+        # for which we must keep multiple revisions.
+        
+        self.reachable = fsIndex()
+        self.reach_ex = {}
+
+        # keep ltid for consistency checks during initial scan
+        self.ltid = ZERO
+
+    def isReachable(self, oid, pos):
+        """Return True if revision of `oid` at `pos` is reachable."""
+
+        rpos = self.reachable.get(oid)
+        if rpos is None:
+            return False
+        if rpos == pos:
+            return True
+        return pos in self.reach_ex.get(oid, [])
+
+    def findReachable(self):
+        self.buildPackIndex()
+        self.findReachableAtPacktime([ZERO])
+        self.findReachableFromFuture()
+
+    def buildPackIndex(self):
+        pos = 1024
+        while pos < self.eof:
+            th = self._read_txn_header(pos)
+            if th.tid > self.packtime:
+                break
+            self.checkTxn(th, pos)
+
+            tpos = pos
+            end = pos + th.tlen
+            pos += th.headerlen()
+
+            while pos < end:
+                dh = self._read_data_header(pos)
+                self.checkData(th, tpos, dh, pos)
+                if dh.version:
+                    self.oid2verpos[dh.oid] = pos
+                else:
+                    self.oid2curpos[dh.oid] = pos
+                pos += dh.recordlen()
+
+            tlen = self._read_num(pos)
+            if tlen != th.tlen:
+                self.fail(pos, "redundant transaction length does not "
+                          "match initial transaction length: %d != %d",
+                          u64(s), th.tlen)
+            pos += 8
+
+        self.packpos = pos
+
+    def findReachableAtPacktime(self, roots):
+        """Mark all objects reachable from the oids in roots as reachable."""
+        todo = list(roots)
+        while todo:
+            oid = todo.pop()
+            if oid in self.reachable:
+                continue
+
+            L = []
+
+            pos = self.oid2curpos.get(oid)
+            if pos is not None:
+                L.append(pos)
+                todo.extend(self.findrefs(pos))
+
+            pos = self.oid2verpos.get(oid)
+            if pos is not None:
+                L.append(pos)
+                todo.extend(self.findrefs(pos))
+
+            if not L:
+                continue
+
+            pos = L.pop()
+            self.reachable[oid] = pos
+            if L:
+                self.reach_ex[oid] = L
+
+    def findReachableFromFuture(self):
+        # In this pass, the roots are positions of object revisions.
+        # We add a pos to extra_roots when there is a backpointer to a
+        # revision that was not current at the packtime.  The
+        # non-current revision could refer to objects that were
+        # otherwise unreachable at the packtime.
+        extra_roots = []
+        
+        pos = self.packpos
+        while pos < self.eof:
+            th = self._read_txn_header(pos)
+            self.checkTxn(th, pos)
+            tpos = pos
+            end = pos + th.tlen
+            pos += th.headerlen()
+
+            while pos < end:
+                dh = self._read_data_header(pos)
+                self.checkData(th, tpos, dh, pos)
+
+                if dh.back and dh.back < self.packpos:
+                    if dh.oid in self.reachable:
+                        L = self.reach_ex.setdefault(dh.oid, [])
+                        if dh.back not in L:
+                            L.append(dh.back)
+                            extra_roots.append(dh.back)
+                    else:
+                        self.reachable[dh.oid] = dh.back
+
+                if dh.version:
+                    if dh.oid in self.reachable:
+                        L = self.reach_ex.setdefault(dh.oid, [])
+                        if dh.pnv not in L:
+                            L.append(dh.pnv)
+                            extra_roots.append(dh.pnv)
+                    else:
+                        self.reachable[dh.oid] = dh.back
+                        
+                pos += dh.recordlen()
+
+            tlen = self._read_num(pos)
+            if tlen != th.tlen:
+                self.fail(pos, "redundant transaction length does not "
+                          "match initial transaction length: %d != %d",
+                          u64(s), th.tlen)
+            pos += 8
+
+        for pos in extra_roots:
+            refs = self.findrefs(pos)
+            self.findReachableAtPacktime(refs)
+
+    def findrefs(self, pos):
+        """Return a list of oids referenced as of packtime."""
+        dh = self._read_data_header(pos)
+        # Chase backpointers until we get to the record with the refs
+        while dh.back:
+            dh = self._read_data_header(dh.back)
+        return splitrefs(self._file.read(dh.nrefs * 8))
+
+class PackCopier(DataCopier):
+
+    # PackCopier has to cope with _file and _tfile being the
+    # same file.  The copy() implementation is written assuming
+    # that they are different, so that using one object doesn't
+    # mess up the file pointer for the other object.
+
+    # PackCopier overrides _resolve_backpointer() and _restore_pnv()
+    # to guarantee that they keep the file pointer for _tfile in
+    # the right place.
+
+    def __init__(self, f, index, vindex, tindex, tvindex):
+        self._file = f
+        self._tfile = f
+        self._index = index
+        self._vindex = vindex
+        self._tindex = tindex
+        self._tvindex = tvindex
+        self._pos = None
+
+    def setTxnPos(self, pos):
+        self._pos = pos
+
+    def _resolve_backpointer(self, prev_txn, oid, data):
+        pos = self._tfile.tell()
+        try:
+            return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
+        finally:
+            self._tfile.seek(pos)
+
+    def _restore_pnv(self, oid, prev, version, bp):
+        pos = self._tfile.tell()
+        try:
+            return DataCopier._restore_pnv(self, oid, prev, version, bp)
+        finally:
+            self._tfile.seek(pos)
+
+class FileStoragePacker(FileStorageFormatter):
+
+    def __init__(self, path, stop, la, lr, cla, clr):
+        self._name = path
+        self._file = open(path, "rb")
+        self._stop = stop
+        self._packt = None
+        self.locked = False
+        self.file_end = os.stat(self._file.name)[stat.ST_SIZE]
+        
+        self.gc = GC(self._file, self.file_end, self._stop)
+
+        # The packer needs to acquire the parent's commit lock
+        # during the copying stage, so the two sets of lock acquire
+        # and release methods are passed to the constructor.
+        self._lock_acquire = la
+        self._lock_release = lr
+        self._commit_lock_acquire = cla
+        self._commit_lock_release = clr
+
+        # The packer will use several indexes.
+        # index: oid -> pos
+        # vindex: version -> pos of XXX
+        # tindex: oid -> pos, for current txn
+        # tvindex: version -> pos of XXX, for current txn
+        
+        self.index = fsIndex()
+        self.vindex = {}
+        self.tindex = {}
+        self.tvindex = {}
+
+        # Index for non-version data.  This is a temporary structure
+        # to reduce I/O during packing
+        self.nvindex = fsIndex()
+
+    def pack(self):
+        # Pack copies all data reachable at the pack time or later.
+        #
+        # Copying occurs in two phases.  In the first phase, txns
+        # before the pack time are copied if the contain any reachable
+        # data.  In the second phase, all txns after the pack time
+        # are copied.
+        #
+        # Txn and data records contain pointers to previous records.
+        # Because these pointers are stored as file offsets, they
+        # must be updated when we copy data.
+        
+        # XXX Need to add sanity checking to pack
+
+        self.gc.findReachable()
+
+        # Setup the destination file and copy the metadata.
+        # XXX rename from _tfile to something clearer
+        self._tfile = open(self._name + ".pack", "w+b")
+        self._file.seek(0)
+        self._tfile.write(self._file.read(self._metadata_size))
+
+        self._copier = PackCopier(self._tfile, self.index, self.vindex,
+                                  self.tindex, self.tvindex)
+
+        ipos, opos = self.copyToPacktime()
+        assert ipos == self.gc.packpos
+        if ipos == opos:
+            # pack didn't free any data.  there's no point in continuing.
+            self._tfile.close()
+            os.remove(self._name + ".pack")
+            return None
+        if ipos < self.file_end:
+            self.copyRest(ipos)
+
+        # OK, we've copied everything. Now we need to wrap things up.
+        pos = self._tfile.tell()
+        self._tfile.flush()
+        self._tfile.close()
+        self._file.close()
+
+        return pos
+
+    def copyToPacktime(self):
+        offset = 0L  # the amount of space freed by packing
+        pos = self._metadata_size
+        new_pos = pos
+
+        while pos < self.gc.packpos:
+            th = self._read_txn_header(pos)
+            new_tpos, pos = self.copyDataRecords(pos, th)
+
+            if new_tpos:
+                new_pos = self._tfile.tell() + 8
+                tlen = new_pos - new_tpos - 8
+                # Update the transaction length
+                self._tfile.seek(new_tpos + 8)
+                self._tfile.write(p64(tlen))
+                self._tfile.seek(new_pos - 8)
+                self._tfile.write(p64(tlen))
+
+            
+            tlen = self._read_num(pos)
+            if tlen != th.tlen:
+                self.fail(pos, "redundant transaction length does not "
+                          "match initial transaction length: %d != %d",
+                          u64(s), th.tlen)
+            pos += 8
+
+        return pos, new_pos
+
+    def fetchBackpointer(self, oid, back):
+        """Return data and refs backpointer `back` to object `oid.
+
+        If `back` is 0 or ultimately resolves to 0, return None
+        and None.  In this case, the transaction undoes the object
+        creation.
+        """
+        if back == 0:
+            return None, None
+        data, refs, serial, tid = self._loadBackTxn(oid, back, False)
+        return data, refs
+
+    def copyDataRecords(self, pos, th):
+        """Copy any current data records between pos and tend.
+
+        Returns position of txn header in output file and position
+        of next record in the input file.
+        
+        If any data records are copied, also write txn header (th).
+        """
+        copy = False
+        new_tpos = 0
+        tend = pos + th.tlen
+        pos += th.headerlen()
+        while pos < tend:
+            h = self._read_data_header(pos)
+            if not self.gc.isReachable(h.oid, pos):
+                pos += h.recordlen()
+                continue
+            pos += h.recordlen()
+
+            # If we are going to copy any data, we need to copy
+            # the transaction header.  Note that we will need to
+            # patch up the transaction length when we are done.
+            if not copy:
+                th.status = "p"
+                s = th.asString()
+                new_tpos = self._tfile.tell()
+                self._tfile.write(s)
+                new_pos = new_tpos + len(s)
+                copy = True
+
+            if h.plen:
+                refs = self._file.read(8 * h.nrefs)
+                data = self._file.read(h.plen)
+            else:
+                # If a current record has a backpointer, fetch
+                # refs and data from the backpointer.  We need
+                # to write the data in the new record.
+                data, refs = self.fetchBackpointer(h.oid, h.back)
+                if refs is not None:
+                    refs = "".join(refs)
+
+            self.writePackedDataRecord(h, data, refs, new_tpos)
+            new_pos = self._tfile.tell()
+
+        return new_tpos, pos
+
+    def writePackedDataRecord(self, h, data, refs, new_tpos):
+        # Update the header to reflect current information, then write
+        # it to the output file.
+        if data is None:
+            data = ""
+        if refs is None:
+            refs = ""
+        h.prev = 0
+        h.back = 0
+        h.plen = len(data)
+        h.nrefs = len(refs) / 8
+        h.tloc = new_tpos
+        pos = self._tfile.tell()
+        if h.version:
+            h.pnv = self.index.get(h.oid, 0)
+            h.vprev = self.vindex.get(h.version, 0)
+            self.vindex[h.version] = pos
+        self.index[h.oid] = pos
+        if h.version:
+            self.vindex[h.version] = pos
+        self._tfile.write(h.asString())
+        self._tfile.write(refs)
+        self._tfile.write(data)
+        if not data:
+            # Packed records never have backpointers (?).
+            # If there is no data, write a ZERO backpointer.
+            # This is a George Bailey event.
+            self._tfile.write(ZERO)
+
+    def copyRest(self, ipos):
+        # After the pack time, all data records are copied.
+        # Copy one txn at a time, using copy() for data.
+
+        while ipos < self.file_end:
+            th = self._read_txn_header(ipos)
+            pos = self._tfile.tell()
+            self._copier.setTxnPos(pos)
+            self._tfile.write(th.asString())
+            tend = ipos + th.tlen
+            ipos += th.headerlen()
+
+            while ipos < tend:
+                h = self._read_data_header(ipos)
+                ipos += h.recordlen()
+                if h.nrefs:
+                    refs = splitrefs(self._file.read(h.nrefs * 8))
+                else:
+                    refs = []
+                prev_txn = None
+                if h.plen:
+                    data = self._file.read(h.plen)
+                else:
+                    data, refs = self.fetchBackpointer(h.oid, h.back)
+                    if h.back:
+                        prev_txn = self.getTxnFromData(h.oid, h.back)
+
+                self._copier.copy(h.oid, h.serial, data, refs, h.version,
+                                  prev_txn, pos, self._tfile.tell())
+
+            tlen = self._tfile.tell() - pos
+            assert tlen == th.tlen
+            self._tfile.write(p64(tlen))
+            ipos += 8
+
+            self.index.update(self.tindex)
+            self.tindex.clear()
+            self.vindex.update(self.tvindex)
+            self.tvindex.clear()
+


=== Zope3/src/zodb/storage/file/main.py 1.1 => 1.2 === (1142/1242 lines abridged)
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/main.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,1239 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""File-based ZODB storage
+
+$Id$
+"""
+
+from __future__ import generators
+
+import os
+import sys
+import time
+import errno
+import base64
+import struct
+import logging
+from struct import pack, unpack
+from cPickle import Pickler, Unpickler, loads
+
+try:
+    from posix import fsync
+except:
+    fsync = None
+
+import zodb.db
+from zodb.storage.base import BaseStorage, splitrefs
+from zodb import conflict
+from zodb.interfaces import *
+from zodb.timestamp import TimeStamp, newTimeStamp, timeStampFromTime
+from zodb.lockfile import LockFile
+from zodb.utils import p64, u64, cp
+from zodb.storage.file.index import fsIndex
+from zodb.storage.interfaces import *
+
+from zodb.storage.file.copy import DataCopier
+from zodb.storage.file.errors import *

[-=- -=- -=- 1142 lines omitted -=- -=- -=-]

+        self.first = first
+        self.last = last
+        self.filter = filter
+        self.i = 0
+        self.results = []
+        self.stop = 0
+
+    def finished(self):
+        """Return True if UndoSearch has found enough records."""
+        # The first txn record is at 1024, so pos must be >= 1024
+        return self.i >= self.last or self.pos < 1024 or self.stop
+
+    def search(self):
+        """Search for another record."""
+        dict = self._readnext()
+        if dict is None:
+            return
+        if dict is not None and (self.filter is None or self.filter(dict)):
+            if self.i >= self.first:
+                self.results.append(dict)
+            self.i += 1
+
+    def _readnext(self):
+        """Read the next record from the storage."""
+        self._file.seek(self.pos - 8)
+        self.pos -= u64(self._file.read(8)) + 8
+        if self.pos < 1024:
+            return None
+        h = self._read_txn_header(self.pos)
+        if h.tid < self.packt or h.status == 'p':
+            self.stop = 1
+            return None
+        assert h.status == " "
+        d = {'id': base64.encodestring(h.tid).rstrip(),
+             'time': TimeStamp(h.tid).timeTime(),
+             'user_name': h.user,
+             'description': h.descr}
+        if h.ext:
+            ext = loads(h.ext)
+            d.update(ext)
+        return d
+
+def cleanup(filename):
+    """Remove all FileStorage related files."""
+    for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
+        try:
+            os.remove(filename + ext)
+        except OSError, e:
+            if e.errno != errno.ENOENT:
+                raise


=== Zope3/src/zodb/storage/file/index.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/index.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,128 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Implement an OID to File-position (long integer) mapping."""
+
+# To save space, we do two things:
+#
+#     1. We split the keys (OIDS) into 6-byte prefixes and 2-byte suffixes.
+#        We use the prefixes as keys in a mapping from prefix to mappings
+#        of suffix to data:
+#
+#           data is  {prefix -> {suffix -> data}}
+#
+#     2. We limit the data size to 48 bits. This should allow databases
+#        as large as 256 terabytes.
+#
+# Mostof the space is consumed by items in the mappings from 2-byte
+# suffix to 6-byte data. This should reduce the overall memory usage to
+# 8-16 bytes per OID.
+#
+# Since the mapping from suffix to data contains at most 256 entries,
+# we use a BTree bucket instead of a full BTree to store the results.
+#
+# We use p64 to convert integers to 8-byte strings and lop off the two
+# high-order bytes when saving. On loading data, we add the leading
+# bytes back before using u64 to convert the data back to (long)
+# integers.
+
+from __future__ import generators
+import struct
+
+from zodb.btrees._fsBTree import fsBucket
+
+# convert between numbers and six-byte strings
+
+def num2str(n):
+    return struct.pack(">Q", n)[2:]
+
+def str2num(s):
+    return struct.unpack(">Q", "\000\000" + s)[0]
+
+class fsIndex:
+
+    def __init__(self):
+        self._data = {}
+
+    def __getitem__(self, key):
+        return str2num(self._data[key[:6]][key[6:]])
+
+    def get(self, key, default=None):
+        tree = self._data.get(key[:6], default)
+        if tree is default:
+            return default
+        v = tree.get(key[6:], default)
+        if v is default:
+            return default
+        return str2num(v)
+
+    def __setitem__(self, key, value):
+        value = num2str(value)
+        treekey = key[:6]
+        tree = self._data.get(treekey)
+        if tree is None:
+            tree = fsBucket()
+            self._data[treekey] = tree
+        tree[key[6:]] = value
+
+    def __len__(self):
+        r = 0
+        for tree in self._data.values():
+            r += len(tree)
+        return r
+
+    def update(self, mapping):
+        for k, v in mapping.items():
+            self[k] = v
+
+    def has_key(self, key):
+        v=self.get(key, self)
+        return v is not self
+
+    def __contains__(self, key):
+        tree = self._data.get(key[:6])
+        if tree is None:
+            return False
+        v = tree.get(key[6:], None)
+        if v is None:
+            return False
+        return True
+
+    def clear(self):
+        self._data.clear()
+
+    def __iter__(self):
+        for prefix, tree in self._data.items():
+            for suffix in tree:
+                yield prefix + suffix
+
+    def keys(self):
+        r = []
+        for prefix, tree in self._data.items():
+            for suffix in tree.keys():
+                r.append(prefix + suffix)
+        return r
+
+    def items(self):
+        r = []
+        for prefix, tree in self._data.items():
+            for suffix, v in tree.items():
+                r.append(((prefix + suffix), str2num(v)))
+        return r
+
+    def values(self):
+        r = []
+        for prefix, tree in self._data.items():
+            for v in tree.values():
+                r.append(str2num(v))
+        return r


=== Zope3/src/zodb/storage/file/format.py 1.1 => 1.2 === (458/558 lines abridged)
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/format.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,555 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tools for working with the low-level FileStorage format.
+
+Files are arranged as follows.
+
+  - The first 1024 bytes are a storage metadata section.
+
+  The first two bytes are the characters F and S.
+  The next two bytes are a storage format version id, currently 43.
+  The next four bytes are the database version string.
+
+  The rest of the section is reserved.
+
+A transaction record consists of:
+
+  - 8-byte transaction id, which is also a time stamp.
+
+  - 8-byte transaction record length - 8.
+
+  - 1-byte status code
+
+  - 2-byte length of user name
+
+  - 2-byte length of description
+
+  - 2-byte length of extension attributes
+
+  -   user name
+
+  -   description
+
+  -   extension attributes
+
+  * A sequence of data records
+

[-=- -=- -=- 458 lines omitted -=- -=- -=-]

+                        self.tloc, self.vlen, self.nrefs, self.plen)
+        if self.version:
+            v = struct.pack(">QQ", self.pnv, self.vprev)
+            return s + v + self.version
+        else:
+            return s
+
+    def setVersion(self, version, pnv, vprev):
+        self.version = version
+        self.vlen = len(version)
+        self.pnv = pnv
+        self.vprev = vprev
+
+    def parseVersion(self, buf):
+        self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
+        self.version = buf[16:]
+
+    def recordlen(self):
+        rlen = DATA_HDR_LEN + (self.nrefs * 8) + (self.plen or 8)
+        if self.version:
+            rlen += 16 + self.vlen
+        return rlen
+
+class TxnHeader:
+    """Header for a transaction record."""
+
+    __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
+                 "ulen", "dlen", "elen")
+
+    def __init__(self, tid, tlen, status, ulen, dlen, elen):
+        self.tid = tid
+        self.tlen = tlen
+        self.status = status
+        self.ulen = ulen
+        self.dlen = dlen
+        self.elen = elen
+
+    def fromString(cls, s):
+        return cls(*struct.unpack(TRANS_HDR, s))
+
+    fromString = classmethod(fromString)
+
+    def asString(self):
+        s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
+                        self.ulen, self.dlen, self.elen)
+        return "".join([s, self.user, self.descr, self.ext])
+
+    def headerlen(self):
+        return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
+


=== Zope3/src/zodb/storage/file/errors.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/errors.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,40 @@
+"""FileStorage-specific exceptions."""
+
+from zodb.interfaces import _fmt_oid
+from zodb.storage.interfaces import StorageError, StorageSystemError
+
+class FileStorageError(StorageError):
+    pass
+
+class PackError(FileStorageError):
+    pass
+
+class FileStorageFormatError(FileStorageError):
+    """Invalid file format
+
+    The format of the given file is not valid.
+    """
+
+class CorruptedError(FileStorageError, StorageSystemError):
+    """Corrupted file storage."""
+
+class CorruptedDataError(CorruptedError):
+
+    def __init__(self, oid=None, buf=None, pos=None):
+        self.oid = oid
+        self.buf = buf
+        self.pos = pos
+
+    def __str__(self):
+        if self.oid:
+            msg = "Error reading oid %s.  Found %r" % (_fmt_oid(self.oid),
+                                                       self.buf)
+        else:
+            msg = "Error reading unknown oid.  Found %r" % self.buf
+        if self.pos:
+            msg += " at %d" % self.pos
+        return msg
+
+class FileStorageQuotaError(FileStorageError, StorageSystemError):
+    """File storage quota exceeded."""
+


=== Zope3/src/zodb/storage/file/dump.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/dump.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,108 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A low-level utility to dump the internal FileStorage representation."""
+
+import struct
+from zodb.storage.file.format \
+     import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
+from zodb.utils import u64
+from zodb.storage.base import splitrefs
+from zodb.storage.tests.base import zodb_unpickle
+
+def fmt(p64):
+    # Return a nicely formatted string for a packaged 64-bit value
+    return "%016x" % u64(p64)
+
+def dump(path, dest=None):
+    Dumper(path, dest).dump()
+
+class Dumper:
+    """A very verbose dumper for debugging FileStorage problems."""
+
+    def __init__(self, path, dest=None):
+        self.file = open(path, "rb")
+        self.dest = dest
+
+    def dump(self):
+        fid = self.file.read(1024)
+        print >> self.dest, "*" * 60
+        print >> self.dest, "file identifier: %r" % fid[:4]
+        print >> self.dest, "database version: %r" % fid[4:8]
+        # XXX perhaps verify that the rest of the metadata is nulls?
+        while self.dump_txn():
+            pass
+
+    def dump_txn(self):
+        pos = self.file.tell()
+        h = self.file.read(TRANS_HDR_LEN)
+        if not h:
+            return False
+        tid, tlen, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
+        end = pos + tlen
+        print >> self.dest, "=" * 60
+        print >> self.dest, "offset: %d" % pos
+        print >> self.dest, "end pos: %d" % end
+        print >> self.dest, "transaction id: %s" % fmt(tid)
+        print >> self.dest, "trec len: %d" % tlen
+        print >> self.dest, "status: %r" % status
+        user = descr = extra = ""
+        if ul:
+            user = self.file.read(ul)
+        if dl:
+            descr = self.file.read(dl)
+        if el:
+            extra = self.file.read(el)
+        print >> self.dest, "user: %r" % user
+        print >> self.dest, "description: %r" % descr
+        print >> self.dest, "len(extra): %d" % el
+        while self.file.tell() < end:
+            self.dump_data(pos)
+        tlen2 = u64(self.file.read(8))
+        print >> self.dest, "redundant trec len: %d" % tlen2
+        return True
+
+    def dump_data(self, tloc):
+        pos = self.file.tell()
+        h = self.file.read(DATA_HDR_LEN)
+        assert len(h) == DATA_HDR_LEN
+        oid, revid, prev, tloc, vlen, nrefs, dlen = struct.unpack(DATA_HDR, h)
+        print >> self.dest, "-" * 60
+        print >> self.dest, "offset: %d" % pos
+        print >> self.dest, "oid: %s" % fmt(oid)
+        print >> self.dest, "revid: %s" % fmt(revid)
+        print >> self.dest, "previous record offset: %d" % prev
+        print >> self.dest, "transaction offset: %d" % tloc
+        if vlen:
+            pnv = self.file.read(8)
+            sprevdata = self.file.read(8)
+            version = self.file.read(vlen)
+            print >> self.dest, "version: %r" % version
+            print >> self.dest, "non-version data offset: %d" % u64(pnv)
+            print >> self.dest, \
+                  "previous version data offset: %d" % u64(sprevdata)
+        print >> self.dest, 'numrefs:', nrefs
+        for ref in splitrefs(self.file.read(nrefs * 8)):
+            print >> self.dest, '\t%s' % fmt(ref)
+        print >> self.dest, "len(data): %d" % dlen
+        data = self.file.read(dlen)
+        # A debugging feature for use with the test suite.
+        if data.startswith("(czodb.storage.tests.minpo\nMinPO\n"):
+            print >> self.dest, "value: %r" % zodb_unpickle(data).value
+        if not dlen:
+            sbp = self.file.read(8)
+            print >> self.dest, "backpointer: %d" % u64(sbp)
+
+if __name__ == "__main__":
+    import sys
+    Dumper(sys.argv[1]).dump()


=== Zope3/src/zodb/storage/file/copy.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/copy.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1,157 @@
+"""Create copy of a data record."""
+
+from zodb.interfaces import MAXTID, ZERO, UndoError
+from zodb.utils import p64, u64
+from zodb.storage.file.format import FileStorageFormatter, DataHeader
+from zodb.storage.file.format \
+     import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR_LEN, DATA_VERSION_HDR_LEN
+
+class DataCopier(FileStorageFormatter):
+    """Mixin class for copying transactions into a storage.
+
+    The restore() and pack() methods share a need to copy data records
+    and update pointers to data in earlier transaction records.  This
+    class provides the shared logic.
+
+    The mixin extends the FileStorageFormatter with a copy() method.
+    It also requires that the concrete class provides the following
+    attributes:
+
+    _file -- file with earlier destination data
+    _tfile -- destination file for copied data
+    _packt -- p64() representation of latest pack time
+    _pos -- file pos of destination transaction
+    _tindex -- maps oid to data record file pos
+    _tvindex -- maps version name to data record file pos
+
+    _tindex and _tvindex are updated by copy().
+
+    The copy() method does not do any locking.
+    """
+
+    def _txn_find(self, tid, stop_at_pack):
+        # _pos always points just past the last transaction
+        pos = self._pos
+        while pos > 1024:
+            self._file.seek(pos - 8)
+            pos = pos - u64(self._file.read(8)) - 8
+            self._file.seek(pos)
+            h = self._file.read(TRANS_HDR_LEN)
+            _tid = h[:8]
+            if _tid == tid:
+                return pos
+            if stop_at_pack:
+                # check the status field of the transaction header
+                # XXX _packt seems to be either ZERO or None
+                if h[16] == 'p' or _tid < self._packt:
+                    break
+        raise UndoError(None, "Invalid transaction id")
+
+    def _data_find(self, tpos, oid, data):
+        # Return backpointer to oid in data record for in transaction at tpos.
+        # It should contain a pickle identical to data. Returns 0 on failure.
+        # Must call with lock held.
+        h = self._read_txn_header(tpos)
+        tend = tpos + h.tlen
+        pos = self._file.tell()
+        while pos < tend:
+            h = self._read_data_header(pos)
+            if h.oid == oid:
+                # Read past any references data
+                self._file.read(h.nrefs * 8)
+                # Make sure this looks like the right data record
+                if h.plen == 0:
+                    # This is also a backpointer.  Gotta trust it.
+                    return pos
+                if h.plen != len(data):
+                    # The expected data doesn't match what's in the
+                    # backpointer.  Something is wrong.
+                    error("Mismatch between data and backpointer at %d", pos)
+                    return 0
+                _data = self._file.read(h.plen)
+                if data != _data:
+                    return 0
+                return pos
+            pos += h.recordlen()
+        return 0
+    
+    def _restore_pnv(self, oid, prev, version, bp):
+        # Find a valid pnv (previous non-version) pointer for this version.
+
+        # If there is no previous record, there can't be a pnv.
+        if not prev:
+            return None
+
+        pnv = None
+        h = self._read_data_header(prev, oid)
+        # If the previous record is for a version, it must have
+        # a valid pnv.
+        if h.version:
+            return h.pnv
+        elif bp:
+            # XXX Not sure the following is always true:
+            # The previous record is not for this version, yet we
+            # have a backpointer to it.  The current record must
+            # be an undo of an abort or commit, so the backpointer
+            # must be to a version record with a pnv.
+            h2 = self._read_data_header(bp, oid)
+            if h2.version:
+                return h2.pnv
+            else:
+                warn("restore could not find previous non-version data "
+                     "at %d or %d", prev, bp)
+                return None
+
+    def _resolve_backpointer(self, prev_txn, oid, data):
+        prev_pos = 0
+        if prev_txn is not None:
+            prev_txn_pos = self._txn_find(prev_txn, 0)
+            if prev_txn_pos:
+                prev_pos = self._data_find(prev_txn_pos, oid, data)
+        return prev_pos
+
+    def copy(self, oid, serial, data, refs, version, prev_txn,
+             txnpos, datapos):
+        prev_pos = self._resolve_backpointer(prev_txn, oid, data)
+        old = self._index.get(oid, 0)
+        # Calculate the pos the record will have in the storage.
+        here = datapos
+        # And update the temp file index
+        self._tindex[oid] = here
+        if prev_pos:
+            # If there is a valid prev_pos, don't write data.
+            data = None
+        if data is None:
+            dlen = 0
+            refs = []
+        else:
+            dlen = len(data)
+        # Write the recovery data record
+        h = DataHeader(oid, serial, old, txnpos, len(version), len(refs), dlen)
+        if version:
+            h.version = version
+            pnv = self._restore_pnv(oid, old, version, prev_pos)
+            if pnv is not None:
+                h.pnv = pnv
+            else:
+                h.pnv = old
+            # Link to the last record for this version
+            h.vprev = self._tvindex.get(version, 0)
+            if not h.vprev:
+                h.vprev = self._vindex.get(version, 0)
+            self._tvindex[version] = here
+
+        self._tfile.write(h.asString())
+        self._tfile.write(''.join(refs))
+        # Write the data or a backpointer
+        if data is None:
+            if prev_pos:
+                self._tfile.write(p64(prev_pos))
+            else:
+                # Write a zero backpointer, which indicates an
+                # un-creation transaction.
+                self._tfile.write(ZERO)
+        else:
+            self._tfile.write(data)
+        
+    


=== Zope3/src/zodb/storage/file/__init__.py 1.1 => 1.2 ===
--- /dev/null	Tue Apr 22 11:23:46 2003
+++ Zope3/src/zodb/storage/file/__init__.py	Tue Apr 22 11:23:12 2003
@@ -0,0 +1 @@
+from zodb.storage.file.main import FileStorage




More information about the Zodb-checkins mailing list