[Zodb-checkins] CVS: Zope3/src/zodb/storage/file - __init__.py:1.1.2.1 copy.py:1.1.2.1 dump.py:1.1.2.1 errors.py:1.1.2.1 format.py:1.1.2.1 index.py:1.1.2.1 main.py:1.1.2.1 pack.py:1.1.2.1

Jeremy Hylton jeremy at zope.com
Wed Apr 16 15:12:34 EDT 2003


Update of /cvs-repository/Zope3/src/zodb/storage/file
In directory cvs.zope.org:/tmp/cvs-serv5718/file

Added Files:
      Tag: jeremy-new-pack-branch
	__init__.py copy.py dump.py errors.py format.py index.py 
	main.py pack.py 
Log Message:
Reorganize FileStorage as a package.
Add new reachability code for pack.


=== Added File Zope3/src/zodb/storage/file/__init__.py ===
from zodb.storage.file.main import FileStorage


=== Added File Zope3/src/zodb/storage/file/copy.py ===
"""Create copy of a data record."""

from zodb.interfaces import MAXTID, ZERO
from zodb.utils import p64, u64
from zodb.storage.file.format import FileStorageFormatter, DataHeader
from zodb.storage.file.format \
     import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR_LEN, DATA_VERSION_HDR_LEN

class DataCopier(FileStorageFormatter):
    """Mixin class for copying transactions into a storage.

    The restore() and pack() methods share a need to copy data records
    and update pointers to data in earlier transaction records.  This
    class provides the shared logic.

    The mixin extends the FileStorageFormatter with a copy() method.
    It also requires that the concrete class provides the following
    attributes:

    _file -- file with earlier destination data
    _tfile -- destination file for copied data
    _packt -- p64() representation of latest pack time
    _pos -- file pos of destination transaction
    _tindex -- maps oid to data record file pos
    _tvindex -- maps version name to data record file pos

    _tindex and _tvindex are updated by copy().

    The copy() method does not do any locking.
    """

    def _txn_find(self, tid, stop_at_pack):
        # _pos always points just past the last transaction
        pos = self._pos
        while pos > 1024:
            self._file.seek(pos - 8)
            pos = pos - u64(self._file.read(8)) - 8
            self._file.seek(pos)
            h = self._file.read(TRANS_HDR_LEN)
            _tid = h[:8]
            if _tid == tid:
                return pos
            if stop_at_pack:
                # check the status field of the transaction header
                # XXX _packt seems to be either ZERO or None
                if h[16] == 'p' or _tid < self._packt:
                    break
        raise UndoError(None, "Invalid transaction id")

    def _data_find(self, tpos, oid, data):
        # Return backpointer to oid in data record for in transaction at tpos.
        # It should contain a pickle identical to data. Returns 0 on failure.
        # Must call with lock held.
        h = self._read_txn_header(tpos)
        tend = tpos + h.tlen
        pos = self._file.tell()
        while pos < tend:
            h = self._read_data_header(pos)
            if h.oid == oid:
                # Read past any references data
                self._file.read(h.nrefs * 8)
                # Make sure this looks like the right data record
                if h.plen == 0:
                    # This is also a backpointer.  Gotta trust it.
                    return pos
                if h.plen != len(data):
                    # The expected data doesn't match what's in the
                    # backpointer.  Something is wrong.
                    error("Mismatch between data and backpointer at %d", pos)
                    return 0
                _data = self._file.read(h.plen)
                if data != _data:
                    return 0
                return pos
            pos += h.recordlen()
        return 0
    
    def _restore_pnv(self, oid, prev, version, bp):
        # Find a valid pnv (previous non-version) pointer for this version.

        # If there is no previous record, there can't be a pnv.
        if not prev:
            return None

        pnv = None
        h = self._read_data_header(prev, oid)
        # If the previous record is for a version, it must have
        # a valid pnv.
        if h.version:
            return h.pnv
        elif bp:
            # XXX Not sure the following is always true:
            # The previous record is not for this version, yet we
            # have a backpointer to it.  The current record must
            # be an undo of an abort or commit, so the backpointer
            # must be to a version record with a pnv.
            h2 = self._read_data_header(bp, oid)
            if h2.version:
                return h2.pnv
            else:
                warn("restore could not find previous non-version data "
                     "at %d or %d", prev, bp)
                return None

    def _resolve_backpointer(self, prev_txn, oid, data):
        prev_pos = 0
        if prev_txn is not None:
            prev_txn_pos = self._txn_find(prev_txn, 0)
            if prev_txn_pos:
                prev_pos = self._data_find(prev_txn_pos, oid, data)
        return prev_pos

    def copy(self, oid, serial, data, refs, version, prev_txn,
             txnpos, datapos):
        prev_pos = self._resolve_backpointer(prev_txn, oid, data)
        old = self._index.get(oid, 0)
        # Calculate the pos the record will have in the storage.
        here = datapos
        # And update the temp file index
        self._tindex[oid] = here
        if prev_pos:
            # If there is a valid prev_pos, don't write data.
            data = None
        if data is None:
            dlen = 0
            refs = []
        else:
            dlen = len(data)
        # Write the recovery data record
        h = DataHeader(oid, serial, old, txnpos, len(version), len(refs), dlen)
        if version:
            h.version = version
            pnv = self._restore_pnv(oid, old, version, prev_pos)
            if pnv is not None:
                h.pnv = pnv
            else:
                h.pnv = old
            # Link to the last record for this version
            h.vprev = self._tvindex.get(version, 0)
            if not h.vprev:
                h.vprev = self._vindex.get(version, 0)
            self._tvindex[version] = here

        self._tfile.write(h.asString())
        self._tfile.write(''.join(refs))
        # Write the data or a backpointer
        if data is None:
            if prev_pos:
                self._tfile.write(p64(prev_pos))
            else:
                # Write a zero backpointer, which indicates an
                # un-creation transaction.
                self._tfile.write(ZERO)
        else:
            self._tfile.write(data)
        
    


=== Added File Zope3/src/zodb/storage/file/dump.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A low-level utility to dump the internal FileStorage representation."""

import struct
from zodb.storage.file.format \
     import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
from zodb.utils import u64
from zodb.storage.base import splitrefs
from zodb.storage.tests.base import zodb_unpickle

def fmt(p64):
    # Return a nicely formatted string for a packaged 64-bit value
    return "%016x" % u64(p64)

def dump(path, dest=None):
    Dumper(path, dest).dump()

class Dumper:
    """A very verbose dumper for debugging FileStorage problems."""

    def __init__(self, path, dest=None):
        self.file = open(path, "rb")
        self.dest = dest

    def dump(self):
        fid = self.file.read(1024)
        print >> self.dest, "*" * 60
        print >> self.dest, "file identifier: %r" % fid[:4]
        print >> self.dest, "database version: %r" % fid[4:8]
        # XXX perhaps verify that the rest of the metadata is nulls?
        while self.dump_txn():
            pass

    def dump_txn(self):
        pos = self.file.tell()
        h = self.file.read(TRANS_HDR_LEN)
        if not h:
            return False
        tid, tlen, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
        end = pos + tlen
        print >> self.dest, "=" * 60
        print >> self.dest, "offset: %d" % pos
        print >> self.dest, "end pos: %d" % end
        print >> self.dest, "transaction id: %s" % fmt(tid)
        print >> self.dest, "trec len: %d" % tlen
        print >> self.dest, "status: %r" % status
        user = descr = extra = ""
        if ul:
            user = self.file.read(ul)
        if dl:
            descr = self.file.read(dl)
        if el:
            extra = self.file.read(el)
        print >> self.dest, "user: %r" % user
        print >> self.dest, "description: %r" % descr
        print >> self.dest, "len(extra): %d" % el
        while self.file.tell() < end:
            self.dump_data(pos)
        tlen2 = u64(self.file.read(8))
        print >> self.dest, "redundant trec len: %d" % tlen2
        return True

    def dump_data(self, tloc):
        pos = self.file.tell()
        h = self.file.read(DATA_HDR_LEN)
        assert len(h) == DATA_HDR_LEN
        oid, revid, prev, tloc, vlen, nrefs, dlen = struct.unpack(DATA_HDR, h)
        print >> self.dest, "-" * 60
        print >> self.dest, "offset: %d" % pos
        print >> self.dest, "oid: %s" % fmt(oid)
        print >> self.dest, "revid: %s" % fmt(revid)
        print >> self.dest, "previous record offset: %d" % prev
        print >> self.dest, "transaction offset: %d" % tloc
        if vlen:
            pnv = self.file.read(8)
            sprevdata = self.file.read(8)
            version = self.file.read(vlen)
            print >> self.dest, "version: %r" % version
            print >> self.dest, "non-version data offset: %d" % u64(pnv)
            print >> self.dest, \
                  "previous version data offset: %d" % u64(sprevdata)
        print >> self.dest, 'numrefs:', nrefs
        for ref in splitrefs(self.file.read(nrefs * 8)):
            print >> self.dest, '\t%s' % fmt(ref)
        print >> self.dest, "len(data): %d" % dlen
        data = self.file.read(dlen)
        # A debugging feature for use with the test suite.
        if data.startswith("(czodb.storage.tests.minpo\nMinPO\n"):
            print >> self.dest, "value: %r" % zodb_unpickle(data).value
        if not dlen:
            sbp = self.file.read(8)
            print >> self.dest, "backpointer: %d" % u64(sbp)

if __name__ == "__main__":
    import sys
    Dumper(sys.argv[1]).dump()


=== Added File Zope3/src/zodb/storage/file/errors.py ===
"""FileStorage-specific exceptions."""

from zodb.interfaces import _fmt_oid
from zodb.storage.interfaces import StorageError, StorageSystemError

class FileStorageError(StorageError):
    pass

class PackError(FileStorageError):
    pass

class FileStorageFormatError(FileStorageError):
    """Invalid file format

    The format of the given file is not valid.
    """

class CorruptedFileStorageError(FileStorageError, StorageSystemError):
    """Corrupted file storage."""

class CorruptedTransactionError(CorruptedFileStorageError):
    pass

class CorruptedDataError(CorruptedFileStorageError):

    def __init__(self, oid=None, buf=None, pos=None):
        self.oid = oid
        self.buf = buf
        self.pos = pos

    def __str__(self):
        if self.oid:
            msg = "Error reading oid %s.  Found %r" % (_fmt_oid(self.oid),
                                                       self.buf)
        else:
            msg = "Error reading unknown oid.  Found %r" % self.buf
        if self.pos:
            msg += " at %d" % self.pos
        return msg

class FileStorageQuotaError(FileStorageError, StorageSystemError):
    """File storage quota exceeded."""



=== Added File Zope3/src/zodb/storage/file/format.py === (419/519 lines abridged)
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tools for working with the low-level FileStorage format.

Files are arranged as follows.

  - The first 1024 bytes are a storage metadata section.

  In this section, the first two bytes are the characters F and S.

  The next two bytes are a storage format version id, currently "42".

  The next section is a four-byte database version string, encoded as
  byte 0: major version number
  byte 1: minor version number
  byte 2: micro version number
  byte 3: release level + serialno
  (see zodb.storage.base for details)

  The rest of the section is reserved.

A transaction record consists of:

  - 8-byte transaction id, which is also a time stamp.

  - 8-byte transaction record length - 8.

  - 1-byte status code

  - 2-byte length of user name

  - 2-byte length of description

  - 2-byte length of extension attributes

  -   user name

  -   description


[-=- -=- -=- 419 lines omitted -=- -=- -=-]

                        self.tloc, self.vlen, self.nrefs, self.plen)
        if self.version:
            v = struct.pack(">QQ", self.pnv, self.vprev)
            return s + v + self.version
        else:
            return s

    def setVersion(self, version, pnv, vprev):
        self.version = version
        self.vlen = len(version)
        self.pnv = pnv
        self.vprev = vprev

    def parseVersion(self, buf):
        self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
        self.version = buf[16:]

    def recordlen(self):
        rlen = DATA_HDR_LEN + (self.nrefs * 8) + (self.plen or 8)
        if self.version:
            rlen += 16 + self.vlen
        return rlen

class TxnHeader:
    """Header for a transaction record."""

    __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
                 "ulen", "dlen", "elen")

    def __init__(self, tid, tlen, status, ulen, dlen, elen):
        self.tid = tid
        self.tlen = tlen
        self.status = status
        self.ulen = ulen
        self.dlen = dlen
        self.elen = elen

    def fromString(cls, s):
        return cls(*struct.unpack(TRANS_HDR, s))

    fromString = classmethod(fromString)

    def asString(self):
        s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
                        self.ulen, self.dlen, self.elen)
        return "".join([s, self.user, self.descr, self.ext])

    def headerlen(self):
        return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen



=== Added File Zope3/src/zodb/storage/file/index.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implement an OID to File-position (long integer) mapping."""

# To save space, we do two things:
#
#     1. We split the keys (OIDS) into 6-byte prefixes and 2-byte suffixes.
#        We use the prefixes as keys in a mapping from prefix to mappings
#        of suffix to data:
#
#           data is  {prefix -> {suffix -> data}}
#
#     2. We limit the data size to 48 bits. This should allow databases
#        as large as 256 terabytes.
#
# Mostof the space is consumed by items in the mappings from 2-byte
# suffix to 6-byte data. This should reduce the overall memory usage to
# 8-16 bytes per OID.
#
# We use p64 to convert integers to 8-byte strings and lop off the two
# high-order bytes when saving. On loading data, we add the leading
# bytes back before using u64 to convert the data back to (long)
# integers.

from __future__ import generators
import struct

from zodb.btrees._fsBTree import fsBTree as _fsBTree

# convert between numbers and six-byte strings

_t32 = 1L<< 32

def num2str(n):
    h, l = divmod(long(n), _t32)
    return struct.pack(">HI", h, l)

def str2num(s):
    h, l = struct.unpack(">HI", s)
    if h:
        return (long(h) << 32) + l
    else:
        return l

class fsIndex:

    def __init__(self):
        self._data = {}

    def __getitem__(self, key):
        return str2num(self._data[key[:6]][key[6:]])

    def get(self, key, default=None):
        tree = self._data.get(key[:6], default)
        if tree is default:
            return default
        v = tree.get(key[6:], default)
        if v is default:
            return default
        return str2num(v)

    def __setitem__(self, key, value):
        value = num2str(value)
        treekey = key[:6]
        tree = self._data.get(treekey)
        if tree is None:
            tree = _fsBTree()
            self._data[treekey] = tree
        tree[key[6:]] = value

    def __len__(self):
        r = 0
        for tree in self._data.values():
            r += len(tree)
        return r

    def update(self, mapping):
        for k, v in mapping.items():
            self[k] = v

    def has_key(self, key):
        v=self.get(key, self)
        return v is not self

    def __contains__(self, key):
        tree = self._data.get(key[:6])
        if tree is None:
            return False
        v = tree.get(key[6:], None)
        if v is None:
            return False
        return True

    def clear(self):
        self._data.clear()

    def __iter__(self):
        for prefix, tree in self._data.items():
            for suffix in tree:
                yield prefix + suffix

    def keys(self):
        r = []
        for prefix, tree in self._data.items():
            for suffix in tree.keys():
                r.append(prefix + suffix)
        return r

    def items(self):
        r = []
        for prefix, tree in self._data.items():
            for suffix, v in tree.items():
                r.append(((prefix + suffix), str2num(v)))
        return r

    def values(self):
        r = []
        for prefix, tree in self._data.items():
            for v in tree.values():
                r.append(str2num(v))
        return r


=== Added File Zope3/src/zodb/storage/file/main.py === (1150/1250 lines abridged)
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""File-based ZODB storage

$Id: main.py,v 1.1.2.1 2003/04/16 18:12:32 jeremy Exp $
"""

from __future__ import generators

import os
import sys
import time
import errno
import base64
import struct
import logging
from struct import pack, unpack
from cPickle import Pickler, Unpickler, loads

try:
    from posix import fsync
except:
    fsync = None

import zodb.db
from zodb.storage.base import BaseStorage, splitrefs
from zodb import conflict
from zodb.interfaces import *
from zodb.timestamp import TimeStamp, newTimeStamp, timeStampFromTime
from zodb.lockfile import LockFile
from zodb.utils import p64, u64, cp
from zodb.storage.file.index import fsIndex
from zodb.storage.interfaces import *

from zodb.storage.file.copy import DataCopier
from zodb.storage.file.errors import *
from zodb.storage.file.format \
     import FileStorageFormatter, DataHeader, TxnHeader
from zodb.storage.file.format \

[-=- -=- -=- 1150 lines omitted -=- -=- -=-]

        self.first = first
        self.last = last
        self.filter = filter
        self.i = 0
        self.results = []
        self.stop = 0

    def finished(self):
        """Return True if UndoSearch has found enough records."""
        # The first txn record is at 1024, so pos must be >= 1024
        return self.i >= self.last or self.pos < 1024 or self.stop

    def search(self):
        """Search for another record."""
        dict = self._readnext()
        if dict is None:
            return
        if dict is not None and (self.filter is None or self.filter(dict)):
            if self.i >= self.first:
                self.results.append(dict)
            self.i += 1

    def _readnext(self):
        """Read the next record from the storage."""
        self._file.seek(self.pos - 8)
        self.pos -= u64(self._file.read(8)) + 8
        if self.pos < 1024:
            return None
        h = self._read_txn_header(self.pos)
        if h.tid < self.packt or h.status == 'p':
            self.stop = 1
            return None
        assert h.status == " "
        d = {'id': base64.encodestring(h.tid).rstrip(),
             'time': TimeStamp(h.tid).timeTime(),
             'user_name': h.user,
             'description': h.descr}
        if h.ext:
            ext = loads(h.ext)
            d.update(ext)
        return d

def cleanup(filename):
    """Remove all FileStorage related files."""
    for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
        try:
            os.remove(filename + ext)
        except OSError, e:
            if e.errno != errno.ENOENT:
                raise


=== Added File Zope3/src/zodb/storage/file/pack.py ===
"""FileStorage helper to find all reachable object as of a certain time.

A storage contains an ordered set of object revisions.  When a storage
is packed, object revisions that are not reachable as of the pack time
are deleted.  The notion of reachability is complicated by
backpointers -- object revisions that point to earlier revisions of
the same object.

An object revisions is reachable at a certain time if it is reachable
from the revision of the root at that time or if it is reachable from
a backpointer after that time.
"""

import os
import stat

from zodb.interfaces import ZERO
from zodb.utils import p64, u64
from zodb.storage.base import splitrefs
from zodb.storage.file.copy import DataCopier
from zodb.storage.file.format import FileStorageFormatter
from zodb.storage.file.index import fsIndex

class GC(FileStorageFormatter):

    def __init__(self, file, eof, packtime):
        self._file = file
        self.eof = eof
        self.packtime = packtime
        # packpos: position of first txn header after pack time
        self.packpos = None
        # XXX pos2oid may be unnecessary
        self.pos2oid = {} # maps data record position to oid
        self.oid2curpos = {} # maps oid to current data record position
        self.reachable = {} # maps oid to list of reachable data record pos

    # XXX what about versions?
    # need PackAfterUndoDeleteVersion

    def findReachable(self):
        self.buildPackIndex()
        if ZERO not in self.oid2curpos:
            return
        self.findReachableAtPacktime([ZERO])
        self.findReachableFromFuture()

    def buildPackIndex(self):
        pos = 1024
        while pos < self.eof:
            th = self._read_txn_header(pos)
            if th.tid > self.packtime:
                break

            end = pos + th.tlen
            pos += th.headerlen()

            while pos < end:
                dh = self._read_data_header(pos)
                self.pos2oid[pos] = dh.oid
                self.oid2curpos[dh.oid] = pos
                pos += dh.recordlen()

            tlen = self._read_num(pos)
            assert tlen == th.tlen
            pos += 8

        self.packpos = pos

    def findReachableAtPacktime(self, roots):
        """Mark all objects reachable from the oids in roots as reachable."""
        todo = list(roots)
        while todo:
            oid = todo.pop()
            if oid in self.reachable:
                continue

            pos = self.oid2curpos[oid]
            self.reachable[oid] = [pos]
            todo.extend(self.findrefs(pos))

    def findReachableFromFuture(self):
        # In this pass, the roots are positions of object revisions.
        # We add a pos to extra_roots when there is a backpointer to a
        # revision that was not current at the packtime.  The
        # non-current revision could refer to objects that were
        # otherwise unreachable at the packtime.
        extra_roots = []
        
        pos = self.packpos
        while pos < self.eof:
            th = self._read_txn_header(pos)
            end = pos + th.tlen
            pos += th.headerlen()

            while pos < end:
                dh = self._read_data_header(pos)

                if dh.back and dh.back < self.packpos:
                    L = self.reachable.setdefault(dh.oid, [])
                    if dh.back not in L:
                        L.append(dh.back)
                        extra_roots.append(dh.back)
                        
                pos += dh.recordlen()

            tlen = self._read_num(pos)
            assert tlen == th.tlen
            pos += 8

        for pos in extra_roots:
            refs = self.findrefs(pos)
            self.findReachableAtPacktime(refs)

    def findrefs(self, pos):
        """Return a list of oids referenced as of packtime."""
        dh = self._read_data_header(pos)
        # Chase backpointers until we get to the record with the refs
        while dh.back:
            dh = self._read_data_header(dh.back)
        return splitrefs(self._file.read(dh.nrefs * 8))

class PackCopier(DataCopier):

    # PackCopier has to cope with _file and _tfile being the
    # same file.  The copy() implementation is written assuming
    # that they are different, so that using one object doesn't
    # mess up the file pointer for the other object.

    # PackCopier overrides _resolve_backpointer() and _restore_pnv()
    # to guarantee that they keep the file pointer for _tfile in
    # the right place.

    def __init__(self, f, index, vindex, tindex, tvindex):
        self._file = f
        self._tfile = f
        self._index = index
        self._vindex = vindex
        self._tindex = tindex
        self._tvindex = tvindex
        self._pos = None

    def setTxnPos(self, pos):
        self._pos = pos

    def _resolve_backpointer(self, prev_txn, oid, data):
        pos = self._tfile.tell()
        try:
            return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
        finally:
            self._tfile.seek(pos)

    def _restore_pnv(self, oid, prev, version, bp):
        pos = self._tfile.tell()
        try:
            return DataCopier._restore_pnv(self, oid, prev, version, bp)
        finally:
            self._tfile.seek(pos)

class FileStoragePacker(FileStorageFormatter):

    def __init__(self, path, stop, la, lr, cla, clr):
        self._name = path
        self._file = open(path, "rb")
        self._stop = stop
        self._packt = None
        self.locked = False
        self.file_end = os.stat(self._file.name)[stat.ST_SIZE]
        
        self.gc = GC(self._file, self.file_end, self._stop)

        # The packer needs to acquire the parent's commit lock
        # during the copying stage, so the two sets of lock acquire
        # and release methods are passed to the constructor.
        self._lock_acquire = la
        self._lock_release = lr
        self._commit_lock_acquire = cla
        self._commit_lock_release = clr

        # The packer will use several indexes.
        # index: oid -> pos
        # vindex: version -> pos of XXX
        # tindex: oid -> pos, for current txn
        # tvindex: version -> pos of XXX, for current txn
        
        self.index = fsIndex()
        self.vindex = {}
        self.tindex = {}
        self.tvindex = {}

        # Index for non-version data.  This is a temporary structure
        # to reduce I/O during packing
        self.nvindex = fsIndex()

    def _loada(self, oid, _index):
        """Read any version and return the version"""
        try:
            pos = _index[oid]
        except KeyError:
            raise POSKeyError(oid)
        h = self._read_data_header(pos)
        refs = splitrefs(self._file.read(h.nrefs * 8))
        if h.plen:
            data = self._file.read(h.plen)
        else:
            # _loadBack() leaves the file position sitting at the right place
            data = self._loadBack(oid, h.back)[0]
        return data, refs, h.version

    def _redundant_pack(self, pos):
        assert pos > 8, pos
        self._file.seek(pos - 8)
        p = u64(self._file.read(8))
        self._file.seek(pos - p + 8)
        return self._file.read(1) != ' ' # XXX or == "p"?

    def pack(self):
        # Pack copies all data reachable at the pack time or later.
        #
        # Copying occurs in two phases.  In the first phase, txns
        # before the pack time are copied if the contain any reachable
        # data.  In the second phase, all txns after the pack time
        # are copied.
        #
        # Txn and data records contain pointers to previous records.
        # Because these pointers are stored as file offsets, they
        # must be updated when we copy data.
        
        # XXX Need to add sanity checking to pack

        self.gc.findReachable()

        # Setup the destination file and copy the metadata.
        # XXX rename from _tfile to something clearer
        self._tfile = open(self._name + ".pack", "w+b")
        self._file.seek(0)
        self._tfile.write(self._file.read(self._metadata_size))

        self._copier = PackCopier(self._tfile, self.index, self.vindex,
                                  self.tindex, self.tvindex)

        ipos, opos = self.copyToPacktime()
        assert ipos == self.gc.packpos
        if ipos == opos:
            # pack didn't free any data.  there's no point in continuing.
            self._tfile.close()
            os.remove(self._name + ".pack")
            return None
        if ipos < self.file_end:
            self.copyRest(ipos)

        # OK, we've copied everything. Now we need to wrap things up.
        pos = self._tfile.tell()
        self._tfile.flush()
        self._tfile.close()
        self._file.close()

        return pos

    def isCurNonversion(self, h, nvpos, curpos):
        """Return True if h is current non-version data,

        where the current data is at curpos.  Always restores the file
        to the position it was at before the call.
        """
        if not curpos:
            return False
        pos = self._file.tell()
        try:
            # XXX probably don't want to read this data everytime.
            # instead, keep a cache of whether curpos is in a version.
            # note that entries can be thrown out after passing curpos.
            cur = self._read_data_header(curpos)
            assert cur.oid == h.oid
            if not cur.version:
                return False
            # If cur has a pnv, we need to chase backpointers until
            # we get to the actual data.
            self._file.read(cur.nrefs * 8)
            if not cur.pnv:
                # The object was created in a version.
                return False
            pnv = self._loadBackPOS(cur.oid, cur.pnv)
            return pnv == nvpos
        finally:
            self._file.seek(pos)

    def copyToPacktime(self):
        offset = 0L  # the amount of space freed by packing
        pos = self._metadata_size
        new_pos = pos

        while pos < self.gc.packpos:
            th = self._read_txn_header(pos)
            new_tpos, pos = self.copyDataRecords(pos, th)

            if new_tpos:
                new_pos = self._tfile.tell() + 8
                tlen = new_pos - new_tpos - 8
                # Update the transaction length
                self._tfile.seek(new_tpos + 8)
                self._tfile.write(p64(tlen))
                self._tfile.seek(new_pos - 8)
                self._tfile.write(p64(tlen))

            # skip the end-of-transaction redundant length
            pos += 8

        return pos, new_pos

    def copyDataRecords(self, pos, th):
        """Copy any current data records between pos and tend.

        Returns position of txn header in output file and position
        of next record in the input file.
        
        If any data records are copied, also write txn header (th).
        """
        copy = False
        new_tpos = 0
        tend = pos + th.tlen
        pos += th.headerlen()
        while pos < tend:
            h = self._read_data_header(pos)
            if not pos in self.gc.reachable.get(h.oid, []):
                pos += h.recordlen()
                continue
            pos += h.recordlen()

            # If we are going to copy any data, we need to copy
            # the transaction header.  Note that we will need to
            # patch up the transaction length when we are done.
            if not copy:
                th.status = "p"
                s = th.asString()
                new_tpos = self._tfile.tell()
                self._tfile.write(s)
                new_pos = new_tpos + len(s)
                copy = True

            if h.plen:
                refs = self._file.read(8 * h.nrefs)
                data = self._file.read(h.plen)
            else:
                # If a current record has a backpointer, fetch
                # refs and data from the backpointer.  We need
                # to write the data in the new record.
                data, refs, serial, tid = self._loadBackTxn(h.oid, h.back)
                refs = "".join(refs)

            self.writePackedDataRecord(h, data, refs, new_tpos)
            new_pos = self._tfile.tell()

        return new_tpos, pos

    def VERSIONcopyDataRecords(self, pos, th):
        """Copy any current data records between pos and tend.

        Returns position of txn header in output file and position
        of next record in the input file.
        
        If any data records are copied, also write txn header (th).
        """
        copy = False
        new_tpos = 0
        tend = pos + th.tlen
        pos += th.headerlen()
        while pos < tend:
            h = self._read_data_header(pos)
            cur_nonversion = False

            # If this data record isn't current, don't copy it.
            # If the current record is in a version, this may be
            # the current non-version data.
            curpos = self.pindex.get(h.oid, 0)
            if curpos != pos:
                if self.isCurNonversion(h, pos, curpos):
                    cur_nonversion = True
                else:
                    pos += h.recordlen()
                    continue
            pos += h.recordlen()

            # If we are going to copy any data, we need to copy
            # the transaction header.  Note that we will need to
            # patch up the transaction length when we are done.
            if not copy:
                th.status = "p"
                s = th.asString()
                new_tpos = self._tfile.tell()
                self._tfile.write(s)
                new_pos = new_tpos + len(s)
                copy = True

            if cur_nonversion:
                self.nvindex[h.oid] = new_pos

            if h.plen:
                refs = self._file.read(8 * h.nrefs)
                data = self._file.read(h.plen)
            else:
                # If a current record has a backpointer, fetch
                # refs and data from the backpointer.  We need
                # to write the data in the new record.
                data, refs, serial, tid = self._loadBackTxn(h.oid, h.back)
                refs = "".join(refs)

            self.writePackedDataRecord(h, data, refs, new_tpos)
            new_pos = self._tfile.tell()

        return new_tpos, pos

    def writePackedDataRecord(self, h, data, refs, new_tpos):
        # Update the header to reflect current information, then write
        # it to the output file.
        h.prev = 0
        h.back = 0
        h.plen = len(data)
        h.nrefs = len(refs) / 8
        h.tloc = new_tpos
        if h.version:
            h.pnv = self.nvindex.get(h.oid, 0)
            h.vprev = self.vindex.get(h.version, 0)
        pos = self._tfile.tell()
        self.index[h.oid] = pos
        if h.version:
            self.vindex[h.version] = pos
        self._tfile.write(h.asString())
        self._tfile.write(refs)
        self._tfile.write(data)

    def copyRest(self, ipos):
        # After the pack time, all data records are copied.
        # Copy one txn at a time, using copy() for data.

        while ipos < self.file_end:
            th = self._read_txn_header(ipos)
            pos = self._tfile.tell()
            self._copier.setTxnPos(pos)
            self._tfile.write(th.asString())
            tend = ipos + th.tlen
            ipos += th.headerlen()

            while ipos < tend:
                h = self._read_data_header(ipos)
                ipos += h.recordlen()
                if h.nrefs:
                    refs = splitrefs(self._file.read(h.nrefs * 8))
                else:
                    refs = []
                prev_txn = None
                if h.plen:
                    data = self._file.read(h.plen)
                else:
                    if not h.back:
                        # If the backpointer is 0, then this transaction
                        # undoes the object creation.  It either aborts
                        # the version that created the object or undid the
                        # transaction that created it.  Return None
                        # for data and refs because the backpointer has
                        # the real data and refs.
                        data = None
                        refs = None
                    else:
                        data, refs, nil, nil = self._loadBackTxn(h.oid, h.back)
                        prev_txn = self.getTxnFromData(h.oid, h.back)

                self._copier.copy(h.oid, h.serial, data, refs, h.version,
                                  prev_txn, pos, self._tfile.tell())

            tlen = self._tfile.tell() - pos
            if tlen != th.tlen:
                # Copying the data records changed their sizes, so we
                # need to patch up the tlen in the header.
                p = self._tfile.tell()
                self._tfile.seek(pos)
                th.tlen = tlen
                self._tfile.write(th.asString())
                self._tfile.seek(p)
            self._tfile.write(p64(tlen))
            ipos += 8

            self.index.update(self.tindex)
            self.tindex.clear()
            self.vindex.update(self.tvindex)
            self.tvindex.clear()




More information about the Zodb-checkins mailing list