[Zodb-checkins] CVS: Zope3/src/zodb/storage - fsindex.py:1.3 fsdump.py:1.3 file.py:1.6 base.py:1.10
Jeremy Hylton
jeremy@zope.com
Fri, 24 Jan 2003 18:21:28 -0500
Update of /cvs-repository/Zope3/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv31712/zodb/storage
Modified Files:
fsindex.py fsdump.py file.py base.py
Log Message:
Merge new-pickle-branch to trunk. Yee ha!
=== Zope3/src/zodb/storage/fsindex.py 1.2 => 1.3 ===
--- Zope3/src/zodb/storage/fsindex.py:1.2 Wed Dec 25 09:12:19 2002
+++ Zope3/src/zodb/storage/fsindex.py Fri Jan 24 18:20:52 2003
@@ -11,9 +11,8 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Implement an OID to File-position (long integer) mapping
-"""
-#
+"""Implement an OID to File-position (long integer) mapping."""
+
# To save space, we do two things:
#
# 1. We split the keys (OIDS) into 6-byte prefixes and 2-byte suffixes.
@@ -34,10 +33,11 @@
# bytes back before using u64 to convert the data back to (long)
# integers.
-from zodb.btrees._fsBTree import fsBTree as _fsBTree
-
+from __future__ import generators
import struct
+from zodb.btrees._fsBTree import fsBTree as _fsBTree
+
# convert between numbers and six-byte strings
_t32 = 1L<< 32
@@ -104,6 +104,11 @@
def clear(self):
self._data.clear()
+
+ def __iter__(self):
+ for prefix, tree in self._data.items():
+ for suffix in tree:
+ yield prefix + suffix
def keys(self):
r = []
=== Zope3/src/zodb/storage/fsdump.py 1.2 => 1.3 ===
--- Zope3/src/zodb/storage/fsdump.py:1.2 Wed Dec 25 09:12:19 2002
+++ Zope3/src/zodb/storage/fsdump.py Fri Jan 24 18:20:52 2003
@@ -11,10 +11,11 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
+"""A low-level utility to dump the internal FileStorage representation."""
import struct
-from zodb.storage.file import TRANS_HDR, TRANS_HDR_LEN
-from zodb.storage.file import DATA_HDR, DATA_HDR_LEN
+from zodb.storage.file \
+ import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
from zodb.utils import u64
def fmt(p64):
@@ -32,9 +33,11 @@
self.dest = dest
def dump(self):
- fid = self.file.read(4)
+ fid = self.file.read(1024)
print >> self.dest, "*" * 60
- print >> self.dest, "file identifier: %r" % fid
+ print >> self.dest, "file identifier: %r" % fid[:4]
+ print >> self.dest, "database version: %r" % fid[4:8]
+ # XXX perhaps verify that the rest of the metadata is nulls?
while self.dump_txn():
pass
@@ -91,3 +94,7 @@
if not dlen:
sbp = self.file.read(8)
print >> self.dest, "backpointer: %d" % u64(sbp)
+
+if __name__ == "__main__":
+ import sys
+ Dumper(sys.argv[1]).dump()
=== Zope3/src/zodb/storage/file.py 1.5 => 1.6 ===
--- Zope3/src/zodb/storage/file.py:1.5 Wed Jan 15 18:28:03 2003
+++ Zope3/src/zodb/storage/file.py Fri Jan 24 18:20:52 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -15,10 +15,20 @@
Files are arranged as follows.
- - The first 4 bytes are a file identifier.
+ - The first 1024 bytes are a storage metadata section.
- - The rest of the file consists of a sequence of transaction
- "records".
+ In this section, the first two bytes are the characters F and S.
+
+ The next two bytes are a storage format version id, currently "01".
+
+ The next section is a four-byte database version string, encoded as
+ byte 0: major version number
+ byte 1: minor version number
+ byte 2: micro version number
+ byte 3: release level + serialno
+ (see zodb.storage.base for details)
+
+ The rest of the section is reserved.
A transaction record consists of:
@@ -134,7 +144,7 @@
fsync = None
import zodb.db
-from zodb.storage import base
+from zodb.storage.base import BaseStorage, TransactionRecord, DataRecord
from zodb import conflict
from zodb import interfaces
from zodb.interfaces import UndoError, POSKeyError, MultipleUndoErrors
@@ -155,9 +165,7 @@
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
-packed_version = 'FS40'
-
-logger = logging.getLogger("zodb.storage.file.%s" % packed_version)
+logger = logging.getLogger("zodb.storage.file")
warn = logger.warn
error = logger.error
@@ -213,6 +221,205 @@
"""Mixin class that can read and write the low-level format."""
# subclasses must provide _file
+
+ def _read_index(self, index, vindex, tindex, stop='\377'*8,
+ ltid=z64, start=None, maxoid=z64, recover=0, read_only=0):
+ """Scan the entire file storage and recreate the index.
+
+ Returns file position, max oid, and last transaction id. It also
+ stores index information in the three dictionary arguments.
+
+ Arguments:
+ index -- dictionary, oid -> data record
+ vindex -- dictionary, oid -> data record for version data
+ tindex -- dictionary, oid -> data record
+ XXX tindex is cleared before return, so it will be empty
+
+ There are several default arguments that affect the scan or the
+ return values. XXX should document them.
+
+ The file position returned is the position just after the last
+ valid transaction record. The oid returned is the maximum object
+ id in the data. The transaction id is the tid of the last
+ transaction.
+ """
+ self._file.seek(0, 2)
+ file_size = self._file.tell()
+ self._file.seek(0)
+
+ if start is None:
+ start = self._metadata_size
+
+ if file_size:
+ if file_size < start:
+ raise FileStorageFormatError(self._file.name)
+ self._read_metadata()
+ else:
+ if not read_only:
+ self._write_metadata()
+ return self._metadata_size, maxoid, ltid
+
+ pos = start
+ self._file.seek(start)
+ tid = '\0' * 7 + '\1'
+
+ while 1:
+ # Read the transaction record
+ h = self._file.read(TRANS_HDR_LEN)
+ if not h:
+ break
+ if len(h) != TRANS_HDR_LEN:
+ if not read_only:
+ warn('%s truncated at %s', self._file.name, pos)
+ self._file.seek(pos)
+ self._file.truncate()
+ break
+
+ tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
+ if el < 0:
+ el = t32 - el
+
+ if tid <= ltid:
+ warn("%s time-stamp reduction at %s", self._file.name, pos)
+ ltid = tid
+
+ if pos+(tl+8) > file_size or status=='c':
+ # Hm, the data were truncated or the checkpoint flag
+ # wasn't cleared. They may also be corrupted, in
+ # which case, we don't want to totally lose the data.
+ if not read_only:
+ warn("%s truncated, possibly due to damaged records at %s",
+ name, pos)
+ _truncate(self._file, self._file.name, pos)
+ break
+
+ if status not in ' up':
+ warn('%s has invalid status, %s, at %s',
+ self._file.name, status, pos)
+
+ if tl < (TRANS_HDR_LEN+ul+dl+el):
+ # We're in trouble. Find out if this is bad data in
+ # the middle of the file, or just a turd that Win 9x
+ # dropped at the end when the system crashed. Skip to
+ # the end and read what should be the transaction
+ # length of the last transaction.
+ self._file.seek(-8, 2)
+ rtl = u64(self._file.read(8))
+ # Now check to see if the redundant transaction length is
+ # reasonable:
+ if file_size - rtl < pos or rtl < TRANS_HDR_LEN:
+ nearPanic('%s has invalid transaction header at %s',
+ self._file.name, pos)
+ if not read_only:
+ warn("It appears that there is invalid data at the "
+ "end of the file, possibly due to a system "
+ "crash. %s truncated to recover from bad data "
+ "at end.", self._file.name)
+ _truncate(file, name, pos)
+ break
+ else:
+ if recover:
+ return pos, None, None
+ panic('%s has invalid transaction header at %s',
+ self._file.name, pos)
+
+ if tid >= stop:
+ break
+
+ tpos = pos
+ tend = tpos + tl
+
+ if status == 'u':
+ # Undone transaction, skip it
+ self._file.seek(tend)
+ h = self._file.read(8)
+ if h != stl:
+ if recover: return tpos, None, None
+ panic('%s has inconsistent transaction length at %s',
+ name, pos)
+ pos = tend + 8
+ continue
+
+ pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
+ while pos < tend:
+ # Read the data records for this transaction
+ h = self._read_data_header(pos)
+ dlen = DATA_HDR_LEN + (h.plen or 8)
+ tindex[h.oid] = pos
+
+ if h.version:
+ vindex[h.version] = pos
+ dlen += 16 + len(h.version)
+
+ if pos + dlen > tend or h.tloc != tpos:
+ if recover:
+ return tpos, None, None
+ panic("%s data record exceeds transaction record at %s",
+ self._file.name, pos)
+
+ if index.get(h.oid, 0) != h.prev:
+ if h.prev:
+ if recover:
+ return tpos, None, None
+ error("%s incorrect previous pointer at %s: "
+ "index says %r record says %r",
+ self._file.name, pos, index.get(h.oid), h.prev)
+
+ pos += dlen
+
+ if pos != tend:
+ if recover:
+ return tpos, None, None
+ panic("%s data records don't add up at %s",
+ self._file.name, tpos)
+
+ # Read the (intentionally redundant) transaction length
+ self._file.seek(pos)
+ l = u64(self._file.read(8))
+ if l != tl:
+ if recover:
+ return tpos, None, None
+ panic("%s redundant transaction length check failed at %s",
+ self._file.name, pos)
+ pos += 8
+
+ if tindex: # avoid the pathological empty transaction case
+ _maxoid = max(tindex.keys()) # in 2.2, just max(tindex)
+ maxoid = max(_maxoid, maxoid)
+ index.update(tindex)
+ tindex.clear()
+
+ return pos, maxoid, ltid
+
+ _metadata_size = 1024
+ _format_version = "41"
+
+ def _read_metadata(self):
+ # Read the 1K metadata block at the beginning of the storage.
+ self._file.seek(0)
+ fs = self._file.read(2)
+ if fs != "FS":
+ raise FileStorageFormatError(self._file.name)
+ fsver = self._file.read(2)
+ if fsver != self._format_version:
+ raise FileStorageFormatError(self._file.name)
+ ver = self._file.read(4)
+ if ver != "\0" * 4:
+ self._version = ver
+
+ def _write_metadata(self):
+ # Write the 1K metadata block at the beginning of the storage.
+ self._file.seek(0)
+ self._file.write("FS")
+ self._file.write(self._format_version)
+ # If self._version is not yet set, write all zeros.
+ if self._version is not None:
+ self._file.write(self._version)
+ else:
+ self._file.write("\0" * 4)
+ # Fill the rest with null bytes
+ self._file.write("\0" * (self._metadata_size - 8))
+
def _read_data_header(self, pos, oid=None):
self._file.seek(pos)
s = self._file.read(DATA_HDR_LEN)
@@ -274,8 +481,7 @@
# seek to transaction header, where tid is first 8 bytes
return self._file.read(8)
-class FileStorage(base.BaseStorage,
- FileStorageFormatter,
+class FileStorage(BaseStorage, FileStorageFormatter,
conflict.ConflictResolvingStorage):
# default pack time is 0
_packt = z64
@@ -332,22 +538,19 @@
os.remove(file_name)
self._clear_index()
self._file = open(file_name, 'w+b')
- self._file.write(packed_version)
+ self._write_metadata()
r = self._restore_index()
if r is not None:
index, vindex, start, maxoid, ltid = r
self._initIndex(index, vindex)
- self._pos, self._oid, tid = read_index(
- self._file, file_name, self._index, self._vindex,
- self._tindex, stop, ltid=ltid, start=start, maxoid=maxoid,
- read_only=read_only,
- )
+ self._pos, self._oid, tid = self._read_index(
+ self._index, self._vindex, self._tindex, stop, ltid=ltid,
+ start=start, maxoid=maxoid, read_only=read_only)
else:
- self._pos, self._oid, tid = read_index(
- self._file, file_name, self._index, self._vindex,
- self._tindex, stop, read_only=read_only,
- )
+ self._pos, self._oid, tid = self._read_index(
+ self._index, self._vindex, self._tindex, stop,
+ read_only=read_only)
self._ltid = tid
# self._pos should always point just past the last
@@ -436,8 +639,7 @@
if index is None or pos is None or oid is None or vindex is None:
return None
- # if pos == 4, then the storage is empty.
- if pos > 4:
+ if pos > self._metadata_size: # otherwise storage is empty
# Get the last transaction
self._file.seek(pos - 8)
tl = u64(self._file.read(8))
@@ -461,6 +663,11 @@
# XXX should log the error, though
pass # We don't care if this fails.
+ def setVersion(self, version):
+ self._version = version
+ if not self._is_read_only:
+ self._write_metadata()
+
def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1)
@@ -1437,10 +1644,9 @@
vindex = {}
tindex = {}
tvindex = {}
- packpos, maxoid, ltid = read_index(self._file, self._name, index,
- vindex, tindex, self._stop,
- read_only=1)
- if packpos == 4:
+ packpos, maxoid, ltid = self._read_index(index, vindex, tindex,
+ self._stop, read_only=1)
+ if packpos == self._metadata_size:
return
if self._redundant_pack(packpos):
return
@@ -1460,8 +1666,10 @@
ofile = open(self._name + '.pack', 'w+b')
pv = z64
offset = 0L # the amount of space freed by packing
- pos = opos = 4L
- ofile.write(packed_version)
+ pos = opos = self._metadata_size
+ # Copy the metadata from the old file to the new one.
+ self._file.seek(0)
+ ofile.write(self._file.read(self._metadata_size))
# Copy the data in two stages. In the packing stage, we skip
# records that are non-current or that are for unreferenced
@@ -1838,7 +2046,7 @@
-def read_index(file, name, index, vindex, tindex, stop='\377'*8,
+def Xread_index(file, name, index, vindex, tindex, stop='\377'*8,
ltid=z64, start=4L, maxoid=z64, recover=0, read_only=0):
"""Scan the entire file storage and recreate the index.
@@ -2052,11 +2260,10 @@
if isinstance(file, StringType):
file = open(file, 'rb')
self._file = file
- if file.read(4) != packed_version:
- raise FileStorageFormatError, file.name
+ self._read_metadata()
file.seek(0,2)
self._file_size = file.tell()
- self._pos = 4L
+ self._pos = self._metadata_size
assert start is None or isinstance(start, StringType)
assert stop is None or isinstance(stop, StringType)
if start:
@@ -2194,7 +2401,7 @@
raise IndexError, index
-class RecordIterator(Iterator, FileStorageFormatter, base.TransactionRecord):
+class RecordIterator(Iterator, FileStorageFormatter, TransactionRecord):
"""Iterate over the transactions in a FileStorage file."""
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
@@ -2243,7 +2450,7 @@
raise IndexError, index
-class Record(base.DataRecord):
+class Record(DataRecord):
"""An abstract database record."""
def __init__(self, *args):
self.oid, self.serial, self.version, self.data, self.data_txn = args
=== Zope3/src/zodb/storage/base.py 1.9 => 1.10 ===
--- Zope3/src/zodb/storage/base.py:1.9 Fri Jan 24 13:51:19 2003
+++ Zope3/src/zodb/storage/base.py Fri Jan 24 18:20:52 2003
@@ -1,3 +1,4 @@
+
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
@@ -11,7 +12,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
-
"""Handy standard storage machinery
$Id$
@@ -23,6 +23,7 @@
import time
import errno
import shutil
+import struct
import threading
from types import StringTypes
import logging
@@ -51,7 +52,6 @@
GBYTES = 1024 * 1024 * 1000
JOIN_TIME = 10
-
class PackStop(Exception):
"""Escape hatch for pack operations."""
@@ -62,6 +62,7 @@
_serial = ZERO # Transaction serial number
_tstatus = ' ' # Transaction status, used for copying data
_is_read_only = False
+ _version = None
def __init__(self, name, base=None):
self._name = name
@@ -103,6 +104,12 @@
def getName(self):
return self._name
+ def getVersion(self):
+ return self._version
+
+ def setVersion(self, version):
+ self._version = version
+
def history(self, oid, version, length=1):
pass
@@ -640,6 +647,23 @@
BSDDB transaction.
"""
self._transaction.abort()
+
+ def _clear_temp(self):
+ """Called from BaseStorage.tpc_abort(), BaseStorage.tpc_begin(),
+ BaseStorage.tpc_finish(), this clears out the temporary log file
+ """
+ # BAW: no-op this since the right CommitLog file operations are
+ # performed by the methods in the derived storage class.
+ pass
+
+ def _setVersion(self, txn, vstr):
+ self._info.put('dbversion', vstr, txn=txn)
+
+ def setVersion(self, version):
+ self._withtxn(self._setVersion, version)
+
+ def getVersion(self):
+ return self._info.get('dbversion')
def close(self):
"""Close the storage.