[Checkins] SVN: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
Checkpointing
Jim Fulton
jim at zope.com
Mon Dec 3 07:27:14 EST 2007
Log message for revision 82094:
Checkpointing
Changed:
U zc.FileStorage/dev/src/zc/FileStorage/__init__.py
-=-
Modified: zc.FileStorage/dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-03 12:27:07 UTC (rev 82093)
+++ zc.FileStorage/dev/src/zc/FileStorage/__init__.py 2007-12-03 12:27:13 UTC (rev 82094)
@@ -14,6 +14,7 @@
import ZODB.FileStorage
import ZODB.FileStorage.fspack
+import BTrees.LLBTree
class FileStorage(ZODB.FileStorage.FileStorage):
@@ -49,7 +50,7 @@
self._lock_acquire, self._lock_release,
self._commit_lock_acquire,
self._commit_lock_release,
- current_size)
+ current_size, referencesf)
try:
opos = None
try:
@@ -89,25 +90,24 @@
class FileStoragePacker(ZODB.FileStorage.fspack.FileStoragePacker):
- def __init__(self, path, stop, la, lr, cla, clr, current_size):
+ def __init__(self, path, stop, la, lr, cla, clr, current_size, referencesf):
self._name = path
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
# return point, else on Windows the caller won't be able to rename
# or remove the storage file.
- if hasattr(os, 'O_DIRECT'):
- fd = os.open(path, os.O_DIRECT)
- self._file = os.fdopen(fd, 'rb', 1<<20)
- else:
- self._file = open(path, "rb")
+ # We set the buffer quite high (32MB) to try to reduce seeks
+ # when the storage is disk is doing other io
+ self._file = open(path, "rb", 1<<25)
+
+
+
self._path = path
self._stop = stop
self.locked = 0
self.file_end = current_size
- self.gc = GC(self._file, self.file_end, self._stop)
-
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
@@ -118,19 +118,317 @@
# The packer will use several indexes.
# index: oid -> pos
- # vindex: version -> pos
# tindex: oid -> pos, for current txn
- # tvindex: version -> pos, for current txn
# oid2tid: not used by the packer
- self.index = fsIndex()
- self.vindex = {}
+ self.index = BTrees.fsBTree.fsIndex()
self.tindex = {}
- self.tvindex = {}
self.oid2tid = {}
self.toid2tid = {}
self.toid2tid_delete = {}
- # Index for non-version data. This is a temporary structure
- # to reduce I/O during packing
- self.nvindex = fsIndex()
+ self.referencesf = referencesf
+
+ def pack(self):
+ packed, index, references, packpos = self.buildPackIndex(
+ self._stop, self.file_end)
+ is packed:
+ # nothing to do
+ self._file.close()
+ return None
+
+ self.updateReferences(references, packpos, self.file_end)
+ index = self.gc(index, references)
+
+ output = open(self._name + ".pack", "w+b", 1<<25)
+ index, new_pos = self.copyToPacktime(packpos, index, output)
+ if new_pos == packpos:
+ # pack didn't free any data. there's no point in continuing.
+ self._file.close()
+ output.close()
+ os.remove(self._name + ".pack")
+ return None
+
+ new_pos = self.copyFromPacktime(packpos, self.file_end, output, index)
+
+ self._commit_lock_acquire()
+ self.locked = 1
+ self._lock_acquire()
+ try:
+ # Re-open the file in unbuffered mode.
+
+ # The main thread may write new transactions to the file,
+ # which creates the possibility that we will read a status
+ # 'c' transaction into the pack thread's stdio buffer even
+ # though we're acquiring the commit lock. Transactions
+ # can still be in progress throughout much of packing, and
+ # are written to the same physical file but via a distinct
+ # Python file object. The code used to leave off the
+ # trailing 0 argument, and then on every platform except
+ # native Windows it was observed that we could read stale
+ # data from the tail end of the file.
+ self._file.close() # else self.gc keeps the original alive & open
+ self._file = open(self._path, "rb", 0)
+ self._file.seek(0, 2)
+ self.file_end = self._file.tell()
+ finally:
+ self._lock_release()
+ if ipos < self.file_end:
+ self.copyRest(ipos)
+
+ # OK, we've copied everything. Now we need to wrap things up.
+ pos = self._tfile.tell()
+ self._tfile.flush()
+ self._tfile.close()
+ self._file.close()
+
+ return pos
+
+
+ def buildPackIndex(self, stop, file_end):
+ index = BTrees.fsBTree.fsIndex()
+ references = BTrees.LOBTree.LOBTree()
+ pos = 4L
+ packed = True
+ while pos < file_end:
+ th = self._read_txn_header(pos)
+ if th.tid > stop:
+ break
+ self.checkTxn(th, pos)
+ if th.status != "p":
+ packed = False
+
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+ if dh.version:
+ self.fail(pos, "Versions are not supported")
+ index[dh.oid] = pos
+ refs = self._refs(dh)
+ if refs:
+ references[oid] = refs
+
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ tlen, th.tlen)
+ pos += 8
+
+ return packed, index, references, pos
+
+ def updateReferences(self, references, pos, file_end):
+ while pos < file_end:
+ th = self._read_txn_header(pos)
+ self.checkTxn(th, pos)
+
+ tpos = pos
+ end = pos + th.tlen
+ pos += th.headerlen()
+
+ while pos < end:
+ dh = self._read_data_header(pos)
+ self.checkData(th, tpos, dh, pos)
+ if dh.version:
+ self.fail(pos, "Versions are not supported")
+ refs = self._refs(dh, self.references.get(oid))
+ if refs:
+ references[oid] = refs
+
+ pos += dh.recordlen()
+
+ tlen = self._read_num(pos)
+ if tlen != th.tlen:
+ self.fail(pos, "redundant transaction length does not "
+ "match initial transaction length: %d != %d",
+ tlen, th.tlen)
+ pos += 8
+
+ def _refs(self, dh, initial=None):
+ # Chase backpointers until we get to the record with the refs
+ while dh.back:
+ dh = self._read_data_header(dh.back)
+
+ if not dh.plen:
+ return initial
+
+ refs = self.referencesf(self._file.read(dh.plen))
+ if not refs:
+ return initial
+
+ if not initial:
+ initial = BTrees.LLBTree.LLSet()
+ initial.update(u64(oid) for oid in refs)
+ result = BTrees.LLBTree.LLSet()
+ result.__setstate___((tuple(initial),))
+ return result
+
+ def gc(self, index, references):
+ to_do = [0]
+ reachable = BTrees.fsBTree.fsIndex()
+ while to_do:
+ ioid = to_do.pop()
+ oid = p64(ioid)
+ if oid in reachable:
+ continue
+ reachable[oid] = index.pop(oid)
+ to_do.extend(references.pop(ioid, ()))
+ references.clear()
+ return reachable
+
+ def copyToPacktime(self, packpos, index, output):
+ pos = new_pos = self._metadata_size
+ output.write(self._file.read(self._metadata_size))
+ new_index = BTrees.fsBTree.fsIndex()
+
+ while pos < packpos:
+ th = self._read_txn_header(pos)
+ new_tpos = 0L
+ tend = pos + th.tlen
+ pos += th.headerlen()
+ while pos < tend:
+ h = self._read_data_header(pos)
+ if index.get(h.oid) != pos:
+ pos += h.recordlen()
+ continue
+
+ pos += h.recordlen()
+
+ # If we are going to copy any data, we need to copy
+ # the transaction header. Note that we will need to
+ # patch up the transaction length when we are done.
+ if not new_tpos:
+ th.status = "p"
+ new_tpos = output.tell()
+ output.write(th.asString())
+
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data = self.fetchBackpointer(h.oid, h.back) or ''
+
+ h.prev = 0
+ h.back = 0
+ h.plen = len(data)
+ h.tloc = new_tpos
+ new_index[h.oid] = output.tell()
+ output.write(h.asString())
+ output.write(data)
+ if not data:
+ # Packed records never have backpointers (?).
+ # If there is no data, write a z64 backpointer.
+ # This is a George Bailey event.
+ output.write(z64)
+
+ if new_tpos:
+ new_pos = output.tell()
+ tlen = p64(new_pos - new_tpos)
+ output.write(tlen)
+ new_pos += 8
+
+ # Update the transaction length
+ output.seek(new_tpos + 8)
+ output.write(tlen)
+ output.seek(new_pos)
+
+ pos += 8
+
+ return new_index, new_pos
+
+ def copyFromPacktime(self, pos, file_end, output, index):
+ while pos < file_end:
+ th = self._read_txn_header(pos)
+ new_tpos = output.tell()
+ output.write(th.asString())
+ tend = pos + th.tlen
+ pos += th.headerlen()
+ while pos < tend:
+ h = self._read_data_header(pos)
+
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ # If a current record has a backpointer, fetch
+ # refs and data from the backpointer. We need
+ # to write the data in the new record.
+ data = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ if h.version:
+ self.fail(pos, "Versions are not supported.")
+
+ self._copier.copy(h.oid, h.tid, data, h.version, prev_txn,
+ new_tpos, output.tell())
+
+
+ pos += h.recordlen()
+
+ new_pos = output.tell()
+ tlen = p64(new_pos - new_tpos)
+ output.write(tlen)
+ new_pos += 8
+
+ if tlen != h.tlen:
+ # Update the transaction length
+ output.seek(new_tpos + 8)
+ output.write(tlen)
+ output.seek(new_pos)
+
+ pos += 8
+
+ return new_index, new_pos
+
+
+
+ def copyOne(self, ipos):
+ # The call below will raise CorruptedDataError at EOF.
+ th = self._read_txn_header(ipos)
+ self._lock_counter += 1
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_release()
+ pos = self._tfile.tell()
+ self._copier.setTxnPos(pos)
+ self._tfile.write(th.asString())
+ tend = ipos + th.tlen
+ ipos += th.headerlen()
+
+ while ipos < tend:
+ h = self._read_data_header(ipos)
+ ipos += h.recordlen()
+ prev_txn = None
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data = self.fetchBackpointer(h.oid, h.back)
+ if h.back:
+ prev_txn = self.getTxnFromData(h.oid, h.back)
+
+ if h.version:
+ self.fail(ipos, "Versions are not supported.")
+
+ self._copier.copy(h.oid, h.tid, data, prev_txn, pos,
+ self._tfile.tell())
+
+ tlen = self._tfile.tell() - pos
+ assert tlen == th.tlen
+ self._tfile.write(p64(tlen))
+ ipos += 8
+
+ self.index.update(self.tindex)
+ self.tindex.clear()
+ if self._lock_counter % 20 == 0:
+ self._commit_lock_acquire()
+ return ipos
+
+
More information about the Checkins
mailing list