[Zope3-checkins] CVS: Zope3/src/zodb/storage - file.py:1.23.2.1
Jeremy Hylton
jeremy@zope.com
Thu, 10 Apr 2003 18:41:13 -0400
Update of /cvs-repository/Zope3/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv6526
Modified Files:
Tag: jeremy-new-pack-branch
file.py
Log Message:
Incomplete progress on the new pack for FileStorage.
=== Zope3/src/zodb/storage/file.py 1.23 => 1.23.2.1 ===
--- Zope3/src/zodb/storage/file.py:1.23 Wed Apr 9 14:00:43 2003
+++ Zope3/src/zodb/storage/file.py Thu Apr 10 18:41:12 2003
@@ -132,6 +132,7 @@
from __future__ import generators
import os
+import stat
import sys
import time
import errno
@@ -452,8 +453,18 @@
s = struct.pack(">QQ", pnv, vprev)
file.write(s + version)
- def _read_txn_header(self, pos):
- pass
+ def _read_txn_header(self, pos, tid=None):
+ self._file.seek(pos)
+ s = self._file.read(TRANS_HDR_LEN)
+ if len(s) != TRANS_HDR_LEN:
+ raise CorruptedDataError(tid, s, pos)
+ h = TxnHeader.fromString(s)
+ if tid is not None and tid != h.tid:
+ raise CorruptedDataError(tid, s, pos)
+ h.user = self._file.read(h.ulen)
+ h.descr = self._file.read(h.dlen)
+ h.ext = self._file.read(h.elen)
+ return h
def _loadBack_impl(self, oid, back):
# shared implementation used by various _loadBack methods
@@ -889,11 +900,8 @@
# Return backpointer to oid in data record for in transaction at tpos.
# It should contain a pickle identical to data. Returns 0 on failure.
# Must call with lock held.
- self._file.seek(tpos)
- h = self._file.read(TRANS_HDR_LEN)
- tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
- self._file.read(ul + dl + el)
- tend = tpos + tl
+ h = self._read_txn_header(tpos)
+ tend = tpos + h.tlen
pos = self._file.tell()
while pos < tend:
h = self._read_data_header(pos)
@@ -1327,14 +1335,18 @@
otloc = self._pos
here = self._pos + self._tfile.tell() + self._thl
# Let's move the file pointer back to the start of the txn record.
- self._file.seek(tpos)
- h = self._file.read(TRANS_HDR_LEN)
- if h[16] != ' ':
+ th = self._read_txn_header(tpos)
+## self._file.seek(tpos)
+## h = self._file.read(TRANS_HDR_LEN)
+## if h[16] != ' ':
+## raise UndoError(None, "non-undoable transaction")
+## tl = u64(h[8:16])
+## ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
+ if th.status != " ":
raise UndoError(None, "non-undoable transaction")
- tl = u64(h[8:16])
- ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
- tend = tpos + tl
- pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
+
+ tend = tpos + th.tlen
+ pos = tpos + th.headerlen()
tindex = {}
# keep track of failures, cause we may succeed later
@@ -1523,6 +1535,21 @@
self._commit_lock_acquire = cla
self._commit_lock_release = clr
+ # The packer will use several indexes.
+ # index: oid -> pos
+ # vindex: version -> pos of XXX
+ # tindex: oid -> pos, for current txn
+ # tvindex: version -> pos of XXX, for current txn
+
+ self.index = fsIndex()
+ self.vindex = {}
+ self.tindex = {}
+ self.tvindex = {}
+
+ # Index for non-version data. This is a temporary structure
+ # to reduce I/O during packing
+ self.nvindex = fsIndex()
+
def _loada(self, oid, _index):
"""Read any version and return the version"""
try:
@@ -1546,55 +1573,166 @@
return self._file.read(1) != ' ' # XXX or == "p"?
def _pack_index(self, index):
- # Return an index for everything reachable at the pack time.
+ """Return packpos and an index of objects reachable at the pack time.
+
+ If the storage is empty or it has been packed to a later time,
+ return None.
+ """
+ # index is a complete index as of the pack time.
+ index = fsIndex()
+ vindex = {}
+ tindex = {}
+ packpos, maxoid, ltid = self._read_index(index, vindex, tindex,
+ self._stop, read_only=1)
+ if packpos == self._metadata_size:
+ return None, None
+ if self._redundant_pack(packpos):
+ return None, None
+ del vindex, tindex
+
+ # Now traverse all objects starting from the root and add
+ # them to pindex. Any object in index but not in pindex
+ # is unreachable and should not be copied.
rootl = [ZERO]
pindex = fsIndex()
while rootl:
oid = rootl.pop()
if oid in pindex:
continue
- # XXX We might need to catch exceptions raised by _loada().
p, refs, v = self._loada(oid, index)
rootl.extend(refs)
pindex[oid] = index[oid]
- return pindex
+ return packpos, pindex
def pack(self):
+ # Pack copies all data reachable at the pack time or later.
+ #
+ # Copying occurs in two phases. In the first phase, txns
+ # before the pack time are copied if the contain any reachable
+ # data. In the second phase, all txns after the pack time
+ # are copied.
+ #
+ # Txn and data records contain pointers to previous records.
+ # Because these pointers are stored as file offsets, they
+ # must be updated when we copy data.
+
packing = True
+ file_end = os.stat(self._file.name)[stat.ST_SIZE]
+ packpos, pindex = self._pack_index()
+ assert packpos <= file_end
- ##################################################################
- # Step 1, get index as of pack time that has only referenced objects.
- index = fsIndex()
- vindex = {}
- # tindex is the index for the current transaction
- tindex = {}
- tvindex = {}
- packpos, maxoid, ltid = self._read_index(index, vindex, tindex,
- self._stop, read_only=1)
- if packpos == self._metadata_size:
- return
- if self._redundant_pack(packpos):
- return
- pindex = self._pack_index(index)
+ # Setup the destination file and copy the metadata.
+ self.ofile = open(self._name + ".pack", "w+b")
+ self._file.seek(0)
+ self.ofile.write(self._file.read(self._metadata_size))
- ##################################################################
- # Step 2, copy data and compute new index based on new positions.
- index = fsIndex()
- vindex = {}
- tindex = {}
- tvindex = {}
+ self.copyToPacktime(packpos)
+ self.copyRest()
- # Index for non-version data. This is a temporary structure
- # to reduce I/O during packing
- nvindex = fsIndex()
+ # OK, we've copied everything. Now we need to wrap things up.
+ self.ofile.flush()
+ self.ofile.close()
+ self._file.close()
+
+ # XXX probably don't need to return indexes, since caller
+ # has object
+ return opos, self.index, self.vindex, self.tindex, self.tvindex
+
+ def isCurNonversion(self, h, nvpos, curpos):
+ """Return True if h is current non-version data,
+
+ where the current data is at curpos. Always restores the file
+ to the position it was at before the call.
+ """
+ pos = self._file.tell()
+ try:
+ # XXX probably don't want to read this data everytime.
+ # instead, keep a cache of whether curpos is in a version.
+ # note that entries can be thrown out after passing curpos.
+ cur = self._read_data_header(curpos)
+ assert cur.oid == h.oid
+ if not cur.version:
+ return False
+ # If cur has a pnv, we need to chase backpointers until
+ # we get to the actual data.
+ self._file.read(cur.nrefs * 8)
+ pnv = self._loadBackPOS(cur.oid, cur.pnv)
+ return pnv == nvpos
+ finally:
+ self._file.seek(pos)
- ofile = open(self._name + '.pack', 'w+b')
- pv = ZERO
+ def copyToPacktime(self, packpos):
offset = 0L # the amount of space freed by packing
- pos = opos = self._metadata_size
- # Copy the metadata from the old file to the new one.
- self._file.seek(0)
- ofile.write(self._file.read(self._metadata_size))
+ pos = self._metadata_size
+ new_pos = pos
+
+ while pos <= packpos:
+ th = self._read_txn_header(pos)
+ copy = False
+ tend = pos + th.tlen
+ pos += th.headerlen()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ cur_nonversion = False
+
+ # If this data record isn't current, don't copy it.
+ # If the current record is in a version, this may be
+ # the current non-version data.
+ curpos = pindex.get(h.oid, 0)
+ if curpos != pos:
+ if self.isCurNonversion(h, curpos):
+ cur_nonversion = True
+ else:
+ pos += h.recordlen()
+ continue
+
+ # If we are going to copy any data, we need to copy
+ # the transaction header. Note that we will need to
+ # patch up the transaction length when we are done.
+ if not copy:
+ th.status = "p"
+ s = th.asString()
+ self.ofile.write(s)
+ new_tpos = new_pos
+ new_pos += len(s)
+ copy = True
+
+ if cur_nonversion:
+ self.nvindex[h.oid] = new_pos
+
+ if h.plen:
+ refs = self._file.read(8 * h.nrefs)
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data, refs, serial, tid = self._loadBackTxn(h.oid, h.back)
+
+ self.writeDataRecord(h, data, refs, new_pos)
+ new_pos = self.ofile.tell()
+
+ if copy:
+ # now fix up the transaction header
+ pass
+
+
+ def writeDataRecord(self, h, data, refs, new_pos):
+ # Update the header to reflect current information, then write
+ # it to the output file.
+ h.prev = 0
+ h.back = 0
+ h.plen = len(data)
+ h.nrefs = len(refs) / 8
+ h.tloc = new_pos
+ if h.version:
+ h.pnv = self.nvindex[h.oid]
+ h.vprev = 0
+ self.ofile.write(h.asString())
+ self.ofile.write(refs)
+ self.ofile.write(data)
+
+ def XXX(self):
# Copy the data in two stages. In the packing stage, we skip
# records that are non-current or that are for unreferenced
@@ -1624,28 +1762,22 @@
self.locked = True
# Read the transaction record
- self._file.seek(pos)
- h = self._file.read(TRANS_HDR_LEN)
- if len(h) < TRANS_HDR_LEN:
+ if pos == file_end:
break
- tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
- if status == 'c':
+ th = self._read_txn_header(pos)
+ if th.status == 'c':
# Oops. we found a checkpoint flag.
break
tpos = pos
- tend = tpos + tl
+ tend = tpos + th.tlen
otpos = opos # start pos of output trans
# write out the transaction record
status = packing and 'p' or ' '
- ofile.write(h[:16] + status + h[17:])
- thl = ul + dl + el
- h = self._file.read(thl)
- if len(h) != thl:
- raise PackError(opos)
- ofile.write(h)
- thl = TRANS_HDR_LEN + thl
+ th.status = status
+ ofile.write(th.asString())
+ thl = th.headerlen()
pos = tpos + thl
opos = otpos + thl
@@ -1780,7 +1912,7 @@
# Now, maybe we need to hack or delete the transaction
otl = opos - otpos
- if otl != tl:
+ if otl != th.tlen:
# Oops, what came out is not what came in!
# Check for empty:
@@ -2139,6 +2271,15 @@
fromString = classmethod(fromString)
+ def asString(self):
+ s = struct.pack(DATA_HDR, self.oid, self.serial, self.tloc,
+ self.vlen, self.plen, self.nrefs, self.back)
+ if self.version:
+ v = struct.pack(">QQ", self.pnv, self.vprev)
+ return s + v + self.version
+ else:
+ return s
+
def parseVersion(self, buf):
self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
self.version = buf[16:]
@@ -2148,6 +2289,33 @@
if self.version:
rlen += 16 + self.vlen
return rlen
+
+class TxnHeader:
+ """Header for a transaction record."""
+
+ __slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
+ "ulen", "dlen", "elen")
+
+ def __init__(self, tid, tlen, status, ulen, dlen, elen):
+ self.tid = tid
+ self.tlen = tlen
+ self.status = status
+ self.ulen = ulen
+ self.dlen = dlen
+ self.elen = elen
+
+ def fromString(cls, s):
+ return cls(*struct.unpack(TRANS_HDR, s))
+
+ fromString = classmethod(fromString)
+
+ def asString(self):
+ s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
+ self.ulen, self.dlen, self.elen)
+ return "".join([s, self.user, self.descr, self.ext])
+
+ def headerlen(self):
+ return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
def cleanup(filename):