[Zodb-checkins] CVS: ZODB3/ZODB/FileStorage -
FileStorage.py:1.1.2.1 __init__.py:1.1.2.1 format.py:1.1.2.1
fspack.py:1.1.2.1
Jeremy Hylton
jeremy at zope.com
Tue Oct 7 14:16:36 EDT 2003
Update of /cvs-repository/ZODB3/ZODB/FileStorage
In directory cvs.zope.org:/tmp/cvs-serv26281/ZODB/FileStorage
Added Files:
Tag: ZODB3-mvcc-2-branch
FileStorage.py __init__.py format.py fspack.py
Log Message:
Replace FileStorage module with a package.
Extract format code from fspack and begin integration with the
main-line of FileStorage.
=== Added File ZODB3/ZODB/FileStorage/FileStorage.py === (1909/2309 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
#
# File-based ZODB storage
#
# Files are arranged as follows.
#
# - The first 4 bytes are a file identifier.
#
# - The rest of the file consists of a sequence of transaction
# "records".
#
# A transaction record consists of:
#
# - 8-byte transaction id, which is also a time stamp.
#
# - 8-byte transaction record length - 8.
#
# - 1-byte status code
#
# - 2-byte length of user name
#
# - 2-byte length of description
#
# - 2-byte length of extension attributes
#
# - user name
#
# - description
#
# - extension attributes
#
# * A sequence of data records
#
# - 8-byte redundant transaction length -8
#
# A data record consists of
#
# - 8-byte oid.
#
# - 8-byte serial, which is a type stamp that matches the
# transaction timestamp.
#
# - 8-byte previous-record file-position.
#
# - 8-byte beginning of transaction record file position.
#
# - 2-byte version length
#
# - 8-byte data length
#
# ? 8-byte position of non-version data
# (if version length > 0)
#
# ? 8-byte position of previous record in this version
# (if version length > 0)
#
# ? version string
# (if version length > 0)
#
# ? data
# (data length > 0)
#
# ? 8-byte position of data record containing data
# (data length == 0)
#
# Note that the lengths and positions are all big-endian.
# Also, the object ids time stamps are big-endian, so comparisons
# are meaningful.
#
# Version handling
#
# There isn't a separate store for versions. Each record has a
# version field, indicating what version it is in. The records in a
# version form a linked list. Each record that has a non-empty
# version string has a pointer to the previous record in the version.
# Version back pointers are retained *even* when versions are
# committed or aborted or when transactions are undone.
#
# There is a notion of "current" version records, which are the
# records in a version that are the current records for their
# respective objects. When a version is comitted, the current records
# are committed to the destination version. When a version is
# aborted, the current records are aborted.
#
# When committing or aborting, we search backward through the linked
# list until we find a record for an object that does not have a
# current record in the version. If we find a record for which the
# non-version pointer is the same as the previous pointer, then we
# forget that the corresponding object had a current record in the
# version. This strategy allows us to avoid searching backward through
# previously committed or aborted version records.
#
# Of course, we ignore records in undone transactions when committing
# or aborting.
#
# Backpointers
#
# When we commit or abort a version, we don't copy (or delete)
# and data. Instead, we write records with back pointers.
#
# A version record *never* has a back pointer to a non-version
# record, because we never abort to a version. A non-version record
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.1.2.1 $'[11:-2]
import base64
from cPickle import Pickler, Unpickler, loads
import errno
import os
import struct
import sys
import time
from types import StringType, DictType
from struct import pack, unpack
# Not all platforms have fsync
fsync = getattr(os, "fsync", None)
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.POSException \
import UndoError, POSKeyError, MultipleUndoErrors, VersionLockError
from ZODB.TimeStamp import TimeStamp
from ZODB.lock_file import LockFile
from ZODB.utils import p64, u64, cp, z64
from ZODB.fspack import FileStoragePacker
from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
try:
from ZODB.fsIndex import fsIndex
except ImportError:
def fsIndex():
return {}
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
t32 = 1L << 32
# the struct formats for the headers
TRANS_HDR = ">8s8scHHH"
DATA_HDR = ">8s8s8s8sH8s"
# constants to support various header sizes
TRANS_HDR_LEN = 23
DATA_HDR_LEN = 42
DATA_VERSION_HDR_LEN = 58
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
def blather(message, *data):
LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
message % data))
def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
message % data))
def error(message, *data):
LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
message % data))
def nearPanic(message, *data):
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
message % data))
def panic(message, *data):
message = message % data
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version, message))
raise CorruptedTransactionError(message)
class FileStorageError(POSException.StorageError):
pass
class PackError(FileStorageError):
pass
class FileStorageFormatError(FileStorageError):
"""Invalid file format
The format of the given file is not valid.
"""
class CorruptedFileStorageError(FileStorageError,
POSException.StorageSystemError):
"""Corrupted file storage."""
class CorruptedTransactionError(CorruptedFileStorageError):
pass
[-=- -=- -=- 1909 lines omitted -=- -=- -=-]
if pos+(tl+8) > self._file_size:
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
warn("%s truncated, possibly due to damaged records at %s",
self._file.name, pos)
break
if status not in ' up':
warn('%s has invalid status, %s, at %s', self._file.name,
status, pos)
if tl < (TRANS_HDR_LEN+ul+dl+el):
# We're in trouble. Find out if this is bad data in
# the middle of the file, or just a turd that Win 9x
# dropped at the end when the system crashed. Skip to
# the end and read what should be the transaction
# length of the last transaction.
seek(-8, 2)
rtl=u64(read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
nearPanic('%s has invalid transaction header at %s',
self._file.name, pos)
warn("It appears that there is invalid data at the end of "
"the file, possibly due to a system crash. %s "
"truncated to recover from bad data at end."
% self._file.name)
break
else:
warn('%s has invalid transaction header at %s',
self._file.name, pos)
break
tpos=pos
tend=tpos+tl
if status=='u':
# Undone transaction, skip it
seek(tend)
h=read(8)
if h != stl:
panic('%s has inconsistent transaction length at %s',
self._file.name, pos)
pos=tend+8
continue
pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
user=read(ul)
description=read(dl)
if el:
try: e=loads(read(el))
except: e={}
else: e={}
result = RecordIterator(tid, status, user, description, e, pos,
tend, file, tpos)
pos = tend
# Read the (intentionally redundant) transaction length
seek(pos)
h = read(8)
if h != stl:
warn("%s redundant transaction length check failed at %s",
self._file.name, pos)
break
self._pos = pos + 8
return result
raise IndexError, index
class RecordIterator(Iterator, BaseStorage.TransactionRecord):
"""Iterate over the transactions in a FileStorage file."""
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
self.tid = tid
self.status = status
self.user = user
self.description = desc
self._extension = ext
self._pos = pos
self._tend = tend
self._file = file
self._tpos = tpos
def next(self, index=0):
pos = self._pos
while pos < self._tend:
# Read the data records for this transaction
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
oid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
prev = u64(sprev)
tloc = u64(stloc)
plen = u64(splen)
dlen = DATA_HDR_LEN + (plen or 8)
if vlen:
dlen += (16 + vlen)
tmp = self._file.read(16)
pv = u64(tmp[8:16])
version = self._file.read(vlen)
else:
version = ''
datapos = pos + DATA_HDR_LEN
if vlen:
datapos += 16 + vlen
assert self._file.tell() == datapos, (self._file.tell(), datapos)
if pos + dlen > self._tend or tloc != self._tpos:
warn("%s data record exceeds transaction record at %s",
file.name, pos)
break
self._pos = pos + dlen
prev_txn = None
if plen:
data = self._file.read(plen)
else:
bp = self._file.read(8)
if bp == z64:
# If the backpointer is 0 (encoded as z64), then
# this transaction undoes the object creation. It
# either aborts the version that created the
# object or undid the transaction that created it.
# Return None instead of a pickle to indicate
# this.
data = None
else:
data, _s, tid = _loadBackTxn(self._file, oid, bp)
prev_txn = getTxnFromData(self._file, oid, bp)
r = Record(oid, serial, version, data, prev_txn)
return r
raise IndexError, index
class Record(BaseStorage.DataRecord):
"""An abstract database record."""
def __init__(self, *args):
self.oid, self.serial, self.version, self.data, self.data_txn = args
class UndoSearch:
def __init__(self, file, pos, packt, first, last, filter=None):
self.file = file
self.pos = pos
self.packt = packt
self.first = first
self.last = last
self.filter = filter
self.i = 0
self.results = []
self.stop = 0
def finished(self):
"""Return True if UndoSearch has found enough records."""
# BAW: Why 39 please? This makes no sense (see also below).
return self.i >= self.last or self.pos < 39 or self.stop
def search(self):
"""Search for another record."""
dict = self._readnext()
if dict is not None and (self.filter is None or self.filter(dict)):
if self.i >= self.first:
self.results.append(dict)
self.i += 1
def _readnext(self):
"""Read the next record from the storage."""
self.file.seek(self.pos - 8)
self.pos -= u64(self.file.read(8)) + 8
self.file.seek(self.pos)
h = self.file.read(TRANS_HDR_LEN)
tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
if tid < self.packt or status == 'p':
self.stop = 1
return None
if status != ' ':
return None
d = u = ''
if ul:
u = self.file.read(ul)
if dl:
d = self.file.read(dl)
e = {}
if el:
try:
e = loads(self.file.read(el))
except:
pass
d = {'id': base64.encodestring(tid).rstrip(),
'time': TimeStamp(tid).timeTime(),
'user_name': u,
'description': d}
d.update(e)
return d
=== Added File ZODB3/ZODB/FileStorage/__init__.py ===
# this is a package
from ZODB.FileStorage.FileStorage import FileStorage, RecordIterator
=== Added File ZODB3/ZODB/FileStorage/format.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
#
# File-based ZODB storage
#
# Files are arranged as follows.
#
# - The first 4 bytes are a file identifier.
#
# - The rest of the file consists of a sequence of transaction
# "records".
#
# A transaction record consists of:
#
# - 8-byte transaction id, which is also a time stamp.
#
# - 8-byte transaction record length - 8.
#
# - 1-byte status code
#
# - 2-byte length of user name
#
# - 2-byte length of description
#
# - 2-byte length of extension attributes
#
# - user name
#
# - description
#
# - extension attributes
#
# * A sequence of data records
#
# - 8-byte redundant transaction length -8
#
# A data record consists of
#
# - 8-byte oid.
#
# - 8-byte serial, which is a type stamp that matches the
# transaction timestamp.
#
# - 8-byte previous-record file-position.
#
# - 8-byte beginning of transaction record file position.
#
# - 2-byte version length
#
# - 8-byte data length
#
# ? 8-byte position of non-version data
# (if version length > 0)
#
# ? 8-byte position of previous record in this version
# (if version length > 0)
#
# ? version string
# (if version length > 0)
#
# ? data
# (data length > 0)
#
# ? 8-byte position of data record containing data
# (data length == 0)
#
# Note that the lengths and positions are all big-endian.
# Also, the object ids time stamps are big-endian, so comparisons
# are meaningful.
#
# Version handling
#
# There isn't a separate store for versions. Each record has a
# version field, indicating what version it is in. The records in a
# version form a linked list. Each record that has a non-empty
# version string has a pointer to the previous record in the version.
# Version back pointers are retained *even* when versions are
# committed or aborted or when transactions are undone.
#
# There is a notion of "current" version records, which are the
# records in a version that are the current records for their
# respective objects. When a version is comitted, the current records
# are committed to the destination version. When a version is
# aborted, the current records are aborted.
#
# When committing or aborting, we search backward through the linked
# list until we find a record for an object that does not have a
# current record in the version. If we find a record for which the
# non-version pointer is the same as the previous pointer, then we
# forget that the corresponding object had a current record in the
# version. This strategy allows us to avoid searching backward through
# previously committed or aborted version records.
#
# Of course, we ignore records in undone transactions when committing
# or aborting.
#
# Backpointers
#
# When we commit or abort a version, we don't copy (or delete)
# and data. Instead, we write records with back pointers.
#
# A version record *never* has a back pointer to a non-version
# record, because we never abort to a version. A non-version record
# may have a back pointer to a version record or to a non-version
# record.
import struct
from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
class CorruptedError(Exception):
pass
class CorruptedDataError(CorruptedError):
def __init__(self, oid=None, buf=None, pos=None):
self.oid = oid
self.buf = buf
self.pos = pos
def __str__(self):
if self.oid:
msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
if self.pos:
msg += " at %d" % self.pos
return msg
# the struct formats for the headers
TRANS_HDR = ">8s8scHHH"
DATA_HDR = ">8s8s8s8sH8s"
# constants to support various header sizes
TRANS_HDR_LEN = 23
DATA_HDR_LEN = 42
DATA_VERSION_HDR_LEN = 58
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
class FileStorageFormatter(object):
"""Mixin class that can read and write the low-level format."""
# subclasses must provide _file
_metadata_size = 4L
_format_version = "21"
def _read_num(self, pos):
"""Read an 8-byte number."""
self._file.seek(pos)
return u64(self._file.read(8))
def _read_data_header(self, pos, oid=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
If there is version data, reads the version part of the header.
If there is no pickle data, reads the back pointer.
"""
self._file.seek(pos)
s = self._file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if h.vlen:
s = self._file.read(16 + h.vlen)
h.parseVersion(s)
if not h.plen:
h.back = u64(self._file.read(8))
return h
def _write_version_header(self, file, pnv, vprev, version):
s = struct.pack(">8s8s", pnv, vprev)
file.write(s + version)
def _read_txn_header(self, pos, tid=None):
self._file.seek(pos)
s = self._file.read(TRANS_HDR_LEN)
if len(s) != TRANS_HDR_LEN:
raise CorruptedDataError(tid, s, pos)
h = TxnHeaderFromString(s)
if tid is not None and tid != h.tid:
raise CorruptedDataError(tid, s, pos)
h.user = self._file.read(h.ulen)
h.descr = self._file.read(h.dlen)
h.ext = self._file.read(h.elen)
return h
def _loadBack_impl(self, oid, back, fail):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is 1, raise KeyError for zero backpointer.
# If fail is 0, return the empty data from the record
# with no backpointer.
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
h = self._read_data_header(back)
if h.plen:
return self._file.read(h.plen), h.serial, back, h.tloc
if h.back == 0 and not fail:
return None, h.serial, back, h.tloc
back = h.back
def _loadBackTxn(self, oid, back, fail=1):
"""Return data, serial, and txn id for backpointer."""
data, serial, old, tloc = self._loadBack_impl(oid, back, fail)
self._file.seek(tloc)
h = self._file.read(TRANS_HDR_LEN)
tid = h[:8]
return data, serial, tid
def getTxnFromData(self, oid, back):
"""Return transaction id for data at back."""
h = self._read_data_header(back, oid)
self._file.seek(h.tloc)
# seek to transaction header, where tid is first 8 bytes
return self._file.read(8)
def fail(self, pos, msg, *args):
s = ("%s:%s:" + msg) % ((self._name, pos) + args)
LOG("FS pack", ERROR, s)
raise CorruptedError(s)
def checkTxn(self, th, pos):
if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s",
oid_repr(th.tid), oid_repr(self.ltid))
self.ltid = th.tid
if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set")
if not th.status in " pu": # recognize " ", "p", and "u" as valid
self.fail(pos, "invalid transaction status: %r", th.status)
if th.tlen < th.headerlen():
self.fail(pos, "invalid transaction header: "
"txnlen (%d) < headerlen(%d)", th.tlen, th.headerlen())
def checkData(self, th, tpos, dh, pos):
tend = tpos + th.tlen
if dh.tloc != tpos:
self.fail(pos, "data record does not point to transaction header"
": %d != %d", dh.tloc, tpos)
if pos + dh.recordlen() > tpos + th.tlen:
self.fail(pos, "data record size exceeds transaction size: "
"%d > %d", pos + dh.recordlen(), tpos + th.tlen)
if dh.prev >= pos:
self.fail(pos, "invalid previous pointer: %d", dh.prev)
if dh.back:
if dh.back >= pos:
self.fail(pos, "invalid back pointer: %d", dh.prev)
if dh.plen:
self.fail(pos, "data record has back pointer and data")
def DataHeaderFromString(s):
return DataHeader(*struct.unpack(DATA_HDR, s))
class DataHeader(object):
"""Header for a data record."""
__slots__ = (
"oid", "serial", "prev", "tloc", "vlen", "plen", "back",
# These three attributes are only defined when vlen > 0
"pnv", "vprev", "version")
def __init__(self, oid, serial, prev, tloc, vlen, plen):
self.back = 0 # default
self.version = "" # default
self.oid = oid
self.serial = serial
if isinstance(prev, str):
prev = u64(prev)
if isinstance(tloc, str):
tloc = u64(tloc)
self.prev = prev
self.tloc = tloc
self.vlen = vlen
if isinstance(plen, str):
plen = u64(plen)
self.plen = plen
def asString(self):
s = struct.pack(DATA_HDR, self.oid, self.serial, p64(self.prev),
p64(self.tloc), self.vlen, p64(self.plen))
if self.version:
v = struct.pack(">8s8s", p64(self.pnv), p64(self.vprev))
return s + v + self.version
else:
return s
def setVersion(self, version, pnv, vprev):
self.version = version
self.vlen = len(version)
self.pnv = pnv
self.vprev = vprev
def parseVersion(self, buf):
pnv, vprev = struct.unpack(">8s8s", buf[:16])
self.pnv = u64(pnv)
self.vprev = u64(vprev)
self.version = buf[16:]
def recordlen(self):
rlen = DATA_HDR_LEN + (self.plen or 8)
if self.version:
rlen += 16 + self.vlen
return rlen
def TxnHeaderFromString(s):
return TxnHeader(*struct.unpack(TRANS_HDR, s))
class TxnHeader(object):
"""Header for a transaction record."""
__slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
"ulen", "dlen", "elen")
def __init__(self, tid, tlen, status, ulen, dlen, elen):
self.tid = tid
self.tlen = u64(tlen)
self.status = status
self.ulen = ulen
self.dlen = dlen
self.elen = elen
def asString(self):
s = struct.pack(TRANS_HDR, self.tid, p64(self.tlen), self.status,
self.ulen, self.dlen, self.elen)
return "".join([s, self.user, self.descr, self.ext])
def headerlen(self):
return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
=== Added File ZODB3/ZODB/FileStorage/fspack.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""FileStorage helper to perform pack.
A storage contains an ordered set of object revisions. When a storage
is packed, object revisions that are not reachable as of the pack time
are deleted. The notion of reachability is complicated by
backpointers -- object revisions that point to earlier revisions of
the same object.
An object revisions is reachable at a certain time if it is reachable
from the revision of the root at that time or if it is reachable from
a backpointer after that time.
"""
# This module contains code backported from ZODB4 from the
# zodb.storage.file package. It's been edited heavily to work with
# ZODB3 code and storage layout.
import os
import struct
from types import StringType
from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try:
from ZODB.fsIndex import fsIndex
except ImportError:
def fsIndex():
return {}
class DataCopier(FileStorageFormatter):
"""Mixin class for copying transactions into a storage.
The restore() and pack() methods share a need to copy data records
and update pointers to data in earlier transaction records. This
class provides the shared logic.
The mixin extends the FileStorageFormatter with a copy() method.
It also requires that the concrete class provides the following
attributes:
_file -- file with earlier destination data
_tfile -- destination file for copied data
_packt -- p64() representation of latest pack time
_pos -- file pos of destination transaction
_tindex -- maps oid to data record file pos
_tvindex -- maps version name to data record file pos
_tindex and _tvindex are updated by copy().
The copy() method does not do any locking.
"""
def _txn_find(self, tid, stop_at_pack):
# _pos always points just past the last transaction
pos = self._pos
while pos > 4:
self._file.seek(pos - 8)
pos = pos - u64(self._file.read(8)) - 8
self._file.seek(pos)
h = self._file.read(TRANS_HDR_LEN)
_tid = h[:8]
if _tid == tid:
return pos
if stop_at_pack:
if h[16] == 'p':
break
raise UndoError(None, "Invalid transaction id")
def _data_find(self, tpos, oid, data):
# Return backpointer to oid in data record for in transaction at tpos.
# It should contain a pickle identical to data. Returns 0 on failure.
# Must call with lock held.
h = self._read_txn_header(tpos)
tend = tpos + h.tlen
pos = self._file.tell()
while pos < tend:
h = self._read_data_header(pos)
if h.oid == oid:
# Make sure this looks like the right data record
if h.plen == 0:
# This is also a backpointer. Gotta trust it.
return pos
if h.plen != len(data):
# The expected data doesn't match what's in the
# backpointer. Something is wrong.
error("Mismatch between data and backpointer at %d", pos)
return 0
_data = self._file.read(h.plen)
if data != _data:
return 0
return pos
pos += h.recordlen()
return 0
def _restore_pnv(self, oid, prev, version, bp):
# Find a valid pnv (previous non-version) pointer for this version.
# If there is no previous record, there can't be a pnv.
if not prev:
return None
pnv = None
h = self._read_data_header(prev, oid)
# If the previous record is for a version, it must have
# a valid pnv.
if h.version:
return h.pnv
elif bp:
# XXX Not sure the following is always true:
# The previous record is not for this version, yet we
# have a backpointer to it. The current record must
# be an undo of an abort or commit, so the backpointer
# must be to a version record with a pnv.
h2 = self._read_data_header(bp, oid)
if h2.version:
return h2.pnv
else:
warn("restore could not find previous non-version data "
"at %d or %d", prev, bp)
return None
def _resolve_backpointer(self, prev_txn, oid, data):
prev_pos = 0
if prev_txn is not None:
prev_txn_pos = self._txn_find(prev_txn, 0)
if prev_txn_pos:
prev_pos = self._data_find(prev_txn_pos, oid, data)
return prev_pos
def copy(self, oid, serial, data, version, prev_txn,
txnpos, datapos):
prev_pos = self._resolve_backpointer(prev_txn, oid, data)
old = self._index.get(oid, 0)
# Calculate the pos the record will have in the storage.
here = datapos
# And update the temp file index
self._tindex[oid] = here
if prev_pos:
# If there is a valid prev_pos, don't write data.
data = None
if data is None:
dlen = 0
else:
dlen = len(data)
# Write the recovery data record
h = DataHeader(oid, serial, old, txnpos, len(version), dlen)
if version:
h.version = version
pnv = self._restore_pnv(oid, old, version, prev_pos)
if pnv is not None:
h.pnv = pnv
else:
h.pnv = old
# Link to the last record for this version
h.vprev = self._tvindex.get(version, 0)
if not h.vprev:
h.vprev = self._vindex.get(version, 0)
self._tvindex[version] = here
self._tfile.write(h.asString())
# Write the data or a backpointer
if data is None:
if prev_pos:
self._tfile.write(p64(prev_pos))
else:
# Write a zero backpointer, which indicates an
# un-creation transaction.
self._tfile.write(z64)
else:
self._tfile.write(data)
class GC(FileStorageFormatter):
def __init__(self, file, eof, packtime):
self._file = file
self._name = file.name
self.eof = eof
self.packtime = packtime
# packpos: position of first txn header after pack time
self.packpos = None
self.oid2curpos = fsIndex() # maps oid to current data record position
self.oid2verpos = fsIndex() # maps oid to current version data
# The set of reachable revisions of each object.
#
# This set as managed using two data structures. The first is
# an fsIndex mapping oids to one data record pos. Since only
# a few objects will have more than one revision, we use this
# efficient data structure to handle the common case. The
# second is a dictionary mapping objects to lists of
# positions; it is used to handle the same number of objects
# for which we must keep multiple revisions.
self.reachable = fsIndex()
self.reach_ex = {}
# keep ltid for consistency checks during initial scan
self.ltid = z64
def isReachable(self, oid, pos):
"""Return 1 if revision of `oid` at `pos` is reachable."""
rpos = self.reachable.get(oid)
if rpos is None:
return 0
if rpos == pos:
return 1
return pos in self.reach_ex.get(oid, [])
def findReachable(self):
self.buildPackIndex()
self.findReachableAtPacktime([z64])
self.findReachableFromFuture()
# These mappings are no longer needed and may consume a lot
# of space.
del self.oid2verpos
del self.oid2curpos
def buildPackIndex(self):
pos = 4L
while pos < self.eof:
th = self._read_txn_header(pos)
if th.tid > self.packtime:
break
self.checkTxn(th, pos)
tpos = pos
end = pos + th.tlen
pos += th.headerlen()
while pos < end:
dh = self._read_data_header(pos)
self.checkData(th, tpos, dh, pos)
if dh.version:
self.oid2verpos[dh.oid] = pos
else:
self.oid2curpos[dh.oid] = pos
pos += dh.recordlen()
tlen = self._read_num(pos)
if tlen != th.tlen:
self.fail(pos, "redundant transaction length does not "
"match initial transaction length: %d != %d",
u64(s), th.tlen)
pos += 8
self.packpos = pos
def findReachableAtPacktime(self, roots):
"""Mark all objects reachable from the oids in roots as reachable."""
todo = list(roots)
while todo:
oid = todo.pop()
if self.reachable.has_key(oid):
continue
L = []
pos = self.oid2curpos.get(oid)
if pos is not None:
L.append(pos)
todo.extend(self.findrefs(pos))
pos = self.oid2verpos.get(oid)
if pos is not None:
L.append(pos)
todo.extend(self.findrefs(pos))
if not L:
continue
pos = L.pop()
self.reachable[oid] = pos
if L:
self.reach_ex[oid] = L
def findReachableFromFuture(self):
# In this pass, the roots are positions of object revisions.
# We add a pos to extra_roots when there is a backpointer to a
# revision that was not current at the packtime. The
# non-current revision could refer to objects that were
# otherwise unreachable at the packtime.
extra_roots = []
pos = self.packpos
while pos < self.eof:
th = self._read_txn_header(pos)
self.checkTxn(th, pos)
tpos = pos
end = pos + th.tlen
pos += th.headerlen()
while pos < end:
dh = self._read_data_header(pos)
self.checkData(th, tpos, dh, pos)
if dh.back and dh.back < self.packpos:
if self.reachable.has_key(dh.oid):
L = self.reach_ex.setdefault(dh.oid, [])
if dh.back not in L:
L.append(dh.back)
extra_roots.append(dh.back)
else:
self.reachable[dh.oid] = dh.back
if dh.version and dh.pnv:
if self.reachable.has_key(dh.oid):
L = self.reach_ex.setdefault(dh.oid, [])
if dh.pnv not in L:
L.append(dh.pnv)
extra_roots.append(dh.pnv)
else:
self.reachable[dh.oid] = dh.back
pos += dh.recordlen()
tlen = self._read_num(pos)
if tlen != th.tlen:
self.fail(pos, "redundant transaction length does not "
"match initial transaction length: %d != %d",
u64(s), th.tlen)
pos += 8
for pos in extra_roots:
refs = self.findrefs(pos)
self.findReachableAtPacktime(refs)
def findrefs(self, pos):
"""Return a list of oids referenced as of packtime."""
dh = self._read_data_header(pos)
# Chase backpointers until we get to the record with the refs
while dh.back:
dh = self._read_data_header(dh.back)
if dh.plen:
return referencesf(self._file.read(dh.plen))
else:
return []
class PackCopier(DataCopier):
# PackCopier has to cope with _file and _tfile being the
# same file. The copy() implementation is written assuming
# that they are different, so that using one object doesn't
# mess up the file pointer for the other object.
# PackCopier overrides _resolve_backpointer() and _restore_pnv()
# to guarantee that they keep the file pointer for _tfile in
# the right place.
def __init__(self, f, index, vindex, tindex, tvindex):
self._file = f
self._tfile = f
self._index = index
self._vindex = vindex
self._tindex = tindex
self._tvindex = tvindex
self._pos = None
def setTxnPos(self, pos):
self._pos = pos
def _resolve_backpointer(self, prev_txn, oid, data):
pos = self._tfile.tell()
try:
return DataCopier._resolve_backpointer(self, prev_txn, oid, data)
finally:
self._tfile.seek(pos)
def _restore_pnv(self, oid, prev, version, bp):
pos = self._tfile.tell()
try:
return DataCopier._restore_pnv(self, oid, prev, version, bp)
finally:
self._tfile.seek(pos)
class FileStoragePacker(FileStorageFormatter):
def __init__(self, path, stop, la, lr, cla, clr):
self._name = path
self._file = open(path, "rb")
self._stop = stop
self._packt = None
self.locked = 0
self._file.seek(0, 2)
self.file_end = self._file.tell()
self._file.seek(0)
self.gc = GC(self._file, self.file_end, self._stop)
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
self._lock_acquire = la
self._lock_release = lr
self._commit_lock_acquire = cla
self._commit_lock_release = clr
# The packer will use several indexes.
# index: oid -> pos
# vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn
# oid2serial: not used by the packer
self.index = fsIndex()
self.vindex = {}
self.tindex = {}
self.tvindex = {}
self.oid2serial = {}
self.toid2serial = {}
self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
self.nvindex = fsIndex()
def pack(self):
# Pack copies all data reachable at the pack time or later.
#
# Copying occurs in two phases. In the first phase, txns
# before the pack time are copied if the contain any reachable
# data. In the second phase, all txns after the pack time
# are copied.
#
# Txn and data records contain pointers to previous records.
# Because these pointers are stored as file offsets, they
# must be updated when we copy data.
# XXX Need to add sanity checking to pack
self.gc.findReachable()
# Setup the destination file and copy the metadata.
# XXX rename from _tfile to something clearer
self._tfile = open(self._name + ".pack", "w+b")
self._file.seek(0)
self._tfile.write(self._file.read(self._metadata_size))
self._copier = PackCopier(self._tfile, self.index, self.vindex,
self.tindex, self.tvindex)
ipos, opos = self.copyToPacktime()
assert ipos == self.gc.packpos
if ipos == opos:
# pack didn't free any data. there's no point in continuing.
self._tfile.close()
os.remove(self._name + ".pack")
return None
self._commit_lock_acquire()
self.locked = 1
self._lock_acquire()
try:
self._file.seek(0, 2)
self.file_end = self._file.tell()
finally:
self._lock_release()
if ipos < self.file_end:
self.copyRest(ipos)
# OK, we've copied everything. Now we need to wrap things up.
pos = self._tfile.tell()
self._tfile.flush()
self._tfile.close()
self._file.close()
return pos
def copyToPacktime(self):
offset = 0L # the amount of space freed by packing
pos = self._metadata_size
new_pos = pos
while pos < self.gc.packpos:
th = self._read_txn_header(pos)
new_tpos, pos = self.copyDataRecords(pos, th)
if new_tpos:
new_pos = self._tfile.tell() + 8
tlen = new_pos - new_tpos - 8
# Update the transaction length
self._tfile.seek(new_tpos + 8)
self._tfile.write(p64(tlen))
self._tfile.seek(new_pos - 8)
self._tfile.write(p64(tlen))
tlen = self._read_num(pos)
if tlen != th.tlen:
self.fail(pos, "redundant transaction length does not "
"match initial transaction length: %d != %d",
u64(s), th.tlen)
pos += 8
return pos, new_pos
def fetchBackpointer(self, oid, back):
"""Return data and refs backpointer `back` to object `oid.
If `back` is 0 or ultimately resolves to 0, return None
and None. In this case, the transaction undoes the object
creation.
"""
if back == 0:
return None
data, serial, tid = self._loadBackTxn(oid, back, 0)
return data
def copyDataRecords(self, pos, th):
"""Copy any current data records between pos and tend.
Returns position of txn header in output file and position
of next record in the input file.
If any data records are copied, also write txn header (th).
"""
copy = 0
new_tpos = 0L
tend = pos + th.tlen
pos += th.headerlen()
while pos < tend:
h = self._read_data_header(pos)
if not self.gc.isReachable(h.oid, pos):
pos += h.recordlen()
continue
pos += h.recordlen()
# If we are going to copy any data, we need to copy
# the transaction header. Note that we will need to
# patch up the transaction length when we are done.
if not copy:
th.status = "p"
s = th.asString()
new_tpos = self._tfile.tell()
self._tfile.write(s)
new_pos = new_tpos + len(s)
copy = 1
if h.plen:
data = self._file.read(h.plen)
else:
# If a current record has a backpointer, fetch
# refs and data from the backpointer. We need
# to write the data in the new record.
data = self.fetchBackpointer(h.oid, h.back)
self.writePackedDataRecord(h, data, new_tpos)
new_pos = self._tfile.tell()
return new_tpos, pos
def writePackedDataRecord(self, h, data, new_tpos):
# Update the header to reflect current information, then write
# it to the output file.
if data is None:
data = ""
h.prev = 0
h.back = 0
h.plen = len(data)
h.tloc = new_tpos
pos = self._tfile.tell()
if h.version:
h.pnv = self.index.get(h.oid, 0)
h.vprev = self.vindex.get(h.version, 0)
self.vindex[h.version] = pos
self.index[h.oid] = pos
if h.version:
self.vindex[h.version] = pos
self._tfile.write(h.asString())
self._tfile.write(data)
if not data:
# Packed records never have backpointers (?).
# If there is no data, write a z64 backpointer.
# This is a George Bailey event.
self._tfile.write(z64)
def copyRest(self, ipos):
# After the pack time, all data records are copied.
# Copy one txn at a time, using copy() for data.
# Release the commit lock every 20 copies
self._lock_counter = 0
try:
while 1:
ipos = self.copyOne(ipos)
except CorruptedDataError, err:
# The last call to copyOne() will raise
# CorruptedDataError, because it will attempt to read past
# the end of the file. Double-check that the exception
# occurred for this reason.
self._file.seek(0, 2)
endpos = self._file.tell()
if endpos != err.pos:
raise
def copyOne(self, ipos):
# The call below will raise CorruptedDataError at EOF.
th = self._read_txn_header(ipos)
self._lock_counter += 1
if self._lock_counter % 20 == 0:
self._commit_lock_release()
pos = self._tfile.tell()
self._copier.setTxnPos(pos)
self._tfile.write(th.asString())
tend = ipos + th.tlen
ipos += th.headerlen()
while ipos < tend:
h = self._read_data_header(ipos)
ipos += h.recordlen()
prev_txn = None
if h.plen:
data = self._file.read(h.plen)
else:
data = self.fetchBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
self._copier.copy(h.oid, h.serial, data, h.version,
prev_txn, pos, self._tfile.tell())
tlen = self._tfile.tell() - pos
assert tlen == th.tlen
self._tfile.write(p64(tlen))
ipos += 8
self.index.update(self.tindex)
self.tindex.clear()
self.vindex.update(self.tvindex)
self.tvindex.clear()
if self._lock_counter % 20 == 0:
self._commit_lock_acquire()
return ipos
More information about the Zodb-checkins
mailing list