[Zodb-checkins] CVS: Zope3/src/zodb/storage - interfaces.py:1.5.4.3 fsdump.py:1.3.8.5 file.py:1.8.4.13
Jeremy Hylton
jeremy@zope.com
Wed, 12 Mar 2003 16:12:07 -0500
Update of /cvs-repository/Zope3/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv19187/storage
Modified Files:
Tag: opaque-pickles-branch
interfaces.py fsdump.py file.py
Log Message:
Fix the last remaining problems with undo and versions.
Remove max argument from versions() YAGNI!
Refactor a lot more code to use _read_data_header().
Twiddle tests to add some txn notes for readability.
=== Zope3/src/zodb/storage/interfaces.py 1.5.4.2 => 1.5.4.3 ===
--- Zope3/src/zodb/storage/interfaces.py:1.5.4.2 Mon Mar 10 14:38:45 2003
+++ Zope3/src/zodb/storage/interfaces.py Wed Mar 12 16:12:00 2003
@@ -198,8 +198,8 @@
def versionEmpty(version):
pass
- def versions(max=None):
- pass
+ def versions():
+ pass
class IStorageIterator(Interface):
=== Zope3/src/zodb/storage/fsdump.py 1.3.8.4 => 1.3.8.5 ===
--- Zope3/src/zodb/storage/fsdump.py:1.3.8.4 Tue Mar 11 18:33:47 2003
+++ Zope3/src/zodb/storage/fsdump.py Wed Mar 12 16:12:00 2003
@@ -17,6 +17,7 @@
from zodb.storage.file \
import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
from zodb.utils import u64
+from zodb.storage.base import splitrefs
from zodb.storage.tests.base import zodb_unpickle
def fmt(p64):
@@ -91,7 +92,8 @@
print >> self.dest, \
"previous version data offset: %d" % u64(sprevdata)
print >> self.dest, 'numrefs:', nrefs
- self.file.read(nrefs * 8)
+ for ref in splitrefs(self.file.read(nrefs * 8)):
+ print >> self.dest, '\t%s' % fmt(refs)
# XXX print out the oids?
print >> self.dest, "len(data): %d" % dlen
data = self.file.read(dlen)
=== Zope3/src/zodb/storage/file.py 1.8.4.12 => 1.8.4.13 ===
--- Zope3/src/zodb/storage/file.py:1.8.4.12 Wed Mar 12 12:52:54 2003
+++ Zope3/src/zodb/storage/file.py Wed Mar 12 16:12:00 2003
@@ -151,13 +151,13 @@
from zodb.storage.base import BaseStorage, splitrefs
from zodb import conflict
from zodb import interfaces
-from zodb.interfaces import UndoError, POSKeyError, MultipleUndoErrors
+from zodb.interfaces import _fmt_oid
+from zodb.interfaces import *
from zodb.timestamp import TimeStamp, newTimeStamp, timeStampFromTime
from zodb.lockfile import lock_file
from zodb.utils import p64, u64, cp
from zodb.storage.fsindex import fsIndex
from zodb.storage.interfaces import *
-from zodb.interfaces import ITransactionAttrs, ZERO, MAXTID
t32 = 1L << 32
# the struct formats for the headers
@@ -207,7 +207,8 @@
def __str__(self):
if self.oid:
- msg = "Error reading oid %016x. Found %r" % (self.oid, self.buf)
+ msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
+ self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
if self.pos:
@@ -428,6 +429,14 @@
self._file.write("\0" * (self._metadata_size - 8))
def _read_data_header(self, pos, oid=None):
+ """Return a DataHeader object for data record at pos.
+
+ If ois is not None, raise CorruptedDataError if oid passed
+ does not match oid in file.
+
+ If there is version data, reads the version part of the header.
+ If there is no pickle data, reads the back pointer.
+ """
self._file.seek(pos)
s = self._file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
@@ -491,6 +500,12 @@
# seek to transaction header, where tid is first 8 bytes
return self._file.read(8)
+class TxnTempFile(FileStorageFormatter):
+ """Helper class used in conjunction with _tfile of FileStorage."""
+
+ def __init__(self, afile):
+ self._file = afile
+
class FileStorage(BaseStorage, FileStorageFormatter,
conflict.ConflictResolvingStorage):
# default pack time is 0
@@ -523,6 +538,7 @@
if not read_only:
self._lock()
self._tfile = open(file_name + '.tmp', 'w+b')
+ self._tfmt = TxnTempFile(self._tfile)
else:
self._tfile = None
@@ -899,8 +915,6 @@
while pos < tend:
h = self._read_data_header(pos)
if h.oid == oid:
- if h.version:
- self._file.read(h.vlen + 16)
# Read past any references data
self._file.read(h.nrefs * 8)
# Make sure this looks like the right data record
@@ -917,7 +931,6 @@
return 0
return pos
pos += h.recordlen()
- self._file.seek(pos)
return 0
def restore(self, oid, serial, data, version, prev_txn, refs, transaction):
@@ -965,11 +978,9 @@
data = None
if data is None:
dlen = 0
- assert refs is None
refs = []
else:
dlen = len(data)
- assert refs is not None
# Write the recovery data record
self._tfile.write(pack(DATA_HDR,
oid, serial, old, self._pos,
@@ -1062,22 +1073,17 @@
return # No data in this trans
self._tfile.seek(0)
user, desc, ext = self._ude
- luser = len(user)
- ldesc = len(desc)
- lext = len(ext)
self._file.seek(self._pos)
tl = self._thl + dlen
- stl = p64(tl)
try:
# Note that we use a status of 'c', for checkpoint.
# If this flag isn't cleared, anything after this is
# suspect.
- self._file.write(pack(
- ">8s" "8s" "c" "H" "H" "H"
- ,self._serial, stl,'c', luser, ldesc, lext,
- ))
+ self._file.write(pack(">8sQcHHH",
+ self._serial, tl, "c",
+ len(user), len(desc), len(ext)))
if user:
self._file.write(user)
if desc:
@@ -1087,7 +1093,7 @@
cp(self._tfile, self._file, dlen)
- self._file.write(stl)
+ self._file.write(p64(tl))
self._file.flush()
except:
# Hm, an error occured writing out the data. Maybe the
@@ -1102,7 +1108,7 @@
nextpos = self._nextpos
if nextpos:
# Clear the checkpoint flag
- self._file.seek(self._pos+16)
+ self._file.seek(self._pos +16 )
self._file.write(self._tstatus)
self._file.flush()
@@ -1120,65 +1126,55 @@
self._file.truncate(self._pos)
self._nextpos=0
- def _undoDataInfo(self, oid, pos, tpos):
+ def getSerial(self, oid):
+ self._lock_acquire()
+ try:
+ return self._getSerial(oid, self._index[oid])
+ finally:
+ self._lock_release()
+
+ def _getSerial(self, oid, pos):
+ self._file.seek(pos)
+ h = self._file.read(16)
+ assert oid == h[:8]
+ return h[8:]
+
+ def _getVersion(self, oid, pos):
+ # Return version and non-version pointer from oid record at pos.
+ h = self._read_data_header(pos)
+ if h.oid != oid:
+ raise ValueError("invalid previous pointer")
+ if h.version:
+ return h.version, p64(h.pnv)
+ else:
+ return "", None
+
+ def _undo_get_data(self, oid, pos, tpos):
"""Return the serial, data pointer, data, and version for the oid
record at pos"""
if tpos:
- file = self._tfile
pos = tpos - self._pos - self._thl
- tpos = file.tell()
+ tpos = self._tfile.tell()
+ h = self._tfmt._read_data_header(pos)
+ afile = self._tfile
else:
- file = self._file
-
- file.seek(pos)
- h = file.read(DATA_HDR_LEN)
- roid, serial, prev, tloc, vlen, nrefs, plen = unpack(DATA_HDR, h)
- if roid != oid:
+ h = self._read_data_header(pos)
+ afile = self._file
+ if h.oid != oid:
raise UndoError(oid, 'Invalid undo transaction id')
- if vlen:
- # skip nv pointer and version previous pointer
- file.read(16)
- version = file.read(vlen)
- else:
- version = ''
-
- refsdata = file.read(nrefs * 8)
- if plen:
- data = file.read(plen)
+ refsdata = afile.read(h.nrefs * 8)
+ if h.plen:
+ data = afile.read(h.plen)
else:
data = ''
- pos = u64(file.read(8))
+ pos = h.back
if tpos:
# Restore temp file to end
- file.seek(tpos)
-
- return serial, pos, data, version
-
- def _getVersion(self, oid, pos):
- self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
- doid, serial, sprev, stloc, vlen, nrefs, splen = unpack(DATA_HDR, h)
- assert doid == oid
- if vlen:
- h = self._file.read(16)
- return self._file.read(vlen), h[:8]
- else:
- return '', ''
-
- def getSerial(self, oid):
- self._lock_acquire()
- try:
- return self._getSerial(oid, self._index[oid])
- finally:
- self._lock_release()
+ self._tfile.seek(tpos)
- def _getSerial(self, oid, pos):
- self._file.seek(pos)
- h = self._file.read(16)
- assert oid == h[:8]
- return h[8:]
+ return h.serial, pos, data, h.version
def _undo_record(self, h, pos):
"""Get the undo information for a data record
@@ -1199,14 +1195,16 @@
if tipos != pos:
# Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are.
- cserial, cdataptr, cdata, cver = self._undoDataInfo(
+ cserial, cdataptr, cdata, cver = self._undo_get_data(
h.oid, ipos, tpos)
# Versions of undone record and current record *must* match!
if cver != h.version:
raise UndoError(oid, 'Current and undone versions differ')
if cdataptr != pos:
- # We aren't sure if we are talking about the same data
+ # If the backpointers don't match, check to see if
+ # conflict resolution is possible. If not, raise
+ # UndoError.
try:
if (
# The current record wrote a new pickle
@@ -1233,6 +1231,7 @@
# is being undone.
return "", "", 0, "", "", ipos
+ # What does getVersion do?
version, snv = self._getVersion(h.oid, h.prev)
if copy:
# we can just copy our previous-record pointer forward
@@ -1342,31 +1341,26 @@
# a helper function to write the data records for transactional undo
otloc = self._pos
- here = self._pos + (self._tfile.tell() + self._thl)
+ here = self._pos + self._tfile.tell() + self._thl
# Let's move the file pointer back to the start of the txn record.
self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN)
if h[16] != ' ':
- raise UndoError(None, 'non-undoable transaction')
+ raise UndoError(None, "non-undoable transaction")
tl = u64(h[8:16])
ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
tend = tpos + tl
pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
tindex = {}
+
# keep track of failures, cause we may succeed later
failures = {}
- failed = failures.has_key
- # Read the data records for this transaction
while pos < tend:
h = self._read_data_header(pos)
- if failed(h.oid):
+ if h.oid in failures:
del failures[h.oid] # second chance!
- if h.version:
- self._file.read(16 + h.vlen)
- self._file.read(h.nrefs * 8)
-
try:
- p, refs, prev, v, snv, ipos = self._undo_record(h, pos)
+ p, refs, prev, version, snv, ipos = self._undo_record(h, pos)
except UndoError, v:
# Don't fail right away. We may be redeemed later!
failures[h.oid] = v
@@ -1374,13 +1368,17 @@
plen = len(p)
nrefs = len(refs) / 8
self._tfile.write(pack(DATA_HDR,
- h.oid, self._serial, ipos,
- otloc, len(v), nrefs, plen))
- if v:
- vprev = self._tvindex.get(v, 0) or self._vindex.get(v, 0)
- self._tfile.write(snv + p64(vprev) + v)
- self._tvindex[v] = here
- odlen = DATA_VERSION_HDR_LEN + len(v) + (plen or 8)
+ h.oid, self._serial, ipos, otloc,
+ len(version), nrefs, plen))
+ # If the backpointer refers to an object in a version,
+ # we need to write a reasonable pointer to the previous
+ # version data, which might be in _tvindex.
+ if version:
+ vprev = (self._tvindex.get(version)
+ or self._vindex.get(version))
+ self._tfile.write(snv + p64(vprev) + version)
+ self._tvindex[version] = here
+ odlen = DATA_VERSION_HDR_LEN + len(version) + (plen or 8)
else:
odlen = DATA_HDR_LEN + (plen or 8)
@@ -1406,50 +1404,24 @@
# any non-version data. This would be excruciatingly painful to
# test, so I must be right. ;)
raise interfaces.VersionError(
- 'The version must be an non-empty string')
+ "The version must be an non-empty string")
self._lock_acquire()
try:
- # XXX not sure what this code is trying to do, except that
- # the tstatus never equals 'u'
- index=self._index
- file=self._file
- seek=file.seek
- read=file.read
- srcpos=self._vindex.get(version, 0)
- t=tstatus=None
- while srcpos:
- seek(srcpos)
- oid=read(8)
- if index[oid]==srcpos: return 0
- h=read(50) # serial, prev(oid), tloc, vlen, plen, pnv, pv
- tloc=h[16:24]
- if t != tloc:
- # We haven't checked this transaction before,
- # get its status.
- t=tloc
- seek(u64(t)+16)
- tstatus=read(1)
-
- assert tstatus != 'u'
- if tstatus != 'u': return 1
-
- spos=h[-8:]
- srcpos=u64(spos)
-
- return 1
- finally: self._lock_release()
-
- def versions(self, max=None):
- r=[]
- a=r.append
- keys=self._vindex.keys()
- if max is not None: keys=keys[:max]
- for version in keys:
- if self.versionEmpty(version): continue
- a(version)
- if max and len(r) >= max: return r
+ pos = self._vindex.get(version, 0)
+ if not pos:
+ return True
+ while pos:
+ h = self._read_data_header(pos)
+ if self._index[h.oid] == pos:
+ return False
+ pos = h.vprev
+ return True
+ finally:
+ self._lock_release()
- return r
+ def versions(self):
+ return [version for version in self._vindex.keys()
+ if not self.versionEmpty(version)]
def pack(self, t):
"""Copy data from the current database file to a packed file