[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Full.py:1.44.2.8
Barry Warsaw
barry@wooz.org
Wed, 30 Oct 2002 16:05:48 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv29771
Modified Files:
Tag: bdb-nolocks
Full.py
Log Message:
Checkpointing after implementing pack(). All the pack tests but two
pass, and I think I know why those fail, but I wanted to checkpoint
this code for now.
I had to add a zaprevs queue so as not to keep some bookkeeping
information in an in-core dictionary. Sigh.
=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.44.2.7 => 1.44.2.8 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.44.2.7 Thu Oct 24 17:41:25 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py Wed Oct 30 16:05:47 2002
@@ -221,6 +221,10 @@
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
#
+ # zaprevs -- [oid+tid]
+ # This is another queue written during the sweep phase to collect
+ # all the object revisions that can be packed away.
+ #
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts')
@@ -244,6 +248,10 @@
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
+ self._zaprevs = db.DB(self._env)
+ self._zaprevs.set_re_len(16)
+ self._zaprevs.open(self._prefix + 'zaprevs',
+ db.DB_QUEUE, db.DB_CREATE)
# DEBUGGING
#self._nextserial = 0L
# END DEBUGGING
@@ -256,6 +264,7 @@
# pack operation will reproduce it faithfully.
self._oidqueue.truncate()
self._packmark.truncate()
+ self._zaprevs.truncate()
# The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys()
@@ -299,6 +308,7 @@
self._pickleRefcounts.close()
self._packmark.close()
self._oidqueue.close()
+ self._zaprevs.close()
BerkeleyBase.close(self)
def _withtxn(self, meth, *args):
@@ -1265,21 +1275,80 @@
#
# There are two types of pack operations, the classic pack and autopack.
# Classic pack is the full blown mark and sweep operation, removing all
- # objects not reachable from the root. This can take a long time,
- # although the implementation attempts to mitigate both in-core memory
- # usage and blocking other, non-packing operations.
+ # revisions of all objects not reachable from the root. This can take a
+ # long time, although the implementation attempts to mitigate both in-core
+ # memory usage and blocking other, non-packing operations.
#
# Autopack is a more lightweight operation. It only removes non-current
# revisions in a window of transactions, and doesn't do a root
# reachability test.
#
+ def pack(self, t, zreferencesf):
+ # For all intents and purposes, referencesf here is always going to be
+ # the same as ZODB.referencesf.referencesf. It's too much of a PITA
+ # to pass that around to the helper methods, so just assert they're
+ # the same.
+ assert zreferencesf == referencesf
+ zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
+ # A simple wrapper around the bulk of packing, but which acquires a
+ # lock that prevents multiple packs from running at the same time.
+ self._packlock.acquire()
+ try:
+ # We don't wrap this in _withtxn() because we're going to do the
+ # operation across several Berkeley transactions. It makes
+ # bookkeeping harder, but it also allows other work to happen
+ # (stores and reads) while packing is being done.
+ self._dopack(t)
+ finally:
+ self._packlock.release()
+ zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
+
+ def _dopack(self, t):
+ # t is a TimeTime, or time float, convert this to a TimeStamp object,
+ # using an algorithm similar to what's used in FileStorage. We know
+ # that our transaction ids, a.k.a. revision ids, are timestamps. BAW:
+ # This doesn't play nicely if you enable the `debugging tids'
+ #
+ # BAW: should a pack time in the future be a ValueError? We'd have to
+ # worry about clock skew, so for now, we just set the pack time to the
+ # minimum of t and now.
+ packtime = min(t, time.time())
+ t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
+ packtid = `t0`
+ # Calculate the set of objects reachable from the root. Anything else
+ # is a candidate for having all their revisions packed away. The set
+ # of reachable objects lives in the _packmark table.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._mark)
+ finally:
+ self._lock_release()
+ # Now cruise through all the transactions from the pack time forward,
+ # getting rid of any objects not reachable from the root, or any
+ # non-current revisions of reachable objects.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._sweep, packtid)
+ finally:
+ self._lock_release()
+ # Now we have the zaprevs table which contains a list of all object
+ # revisions that can get packed away. So zap 'em.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect)
+ finally:
+ self._lock_release()
+
def _mark(self, txn):
- # Find the oids for all the objects reachable. To reduce the amount
- # of in-core memory we need do do a pack operation, we'll save the
- # mark data in a BerkeleyDB table.
+ # Find the oids for all the objects reachable from the root. To
+ # reduce the amount of in-core memory we need do do a pack operation,
+ # we'll save the mark data in the packmark table. The oidqueue is a
+ # BerkeleyDB Queue that holds the list of object ids to look at next,
+ # and by using this we don't need to keep an in-memory dictionary.
assert len(self._packmark) == 0
assert len(self._oidqueue) == 0
+ assert len(self._zaprevs) == 0
# Quick exit for empty storages
if not self._serials:
return
@@ -1295,51 +1364,140 @@
self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object's current version
serial, tid = self._getSerialAndTidMissingOk(oid)
- lrevid = self._metadata[oid+tid][16:24]
- data = self._pickles[oid+lrevid]
- # Now get the oids of all the objects referenced by this pickle
- refdoids = []
- referencesf(data, refdoids)
- # And append them to the queue for later
- for oid in refdoids:
- self._oidqueue.append(oid, txn=txn)
- # Pop the next oid off the queue and do it all again
+ # Say there's no root object (as is the case in some of the unit
+ # tests), and we're looking up oid ZERO. Then serial will be None.
+ if serial is not None:
+ lrevid = self._metadata[oid+tid][16:24]
+ data = self._pickles[oid+lrevid]
+ # Now get the oids of all the objects referenced by this pickle
+ refdoids = []
+ referencesf(data, refdoids)
+ # And append them to the queue for later
+ for oid in refdoids:
+ self._oidqueue.append(oid, txn)
+ # Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume()
- oid = rec[1]
+ oid = rec and rec[1]
assert len(self._oidqueue) == 0
- def _sweep(self, txn):
+ def _sweep(self, txn, packtid):
# We need to get the pending tid to skip over any serials entries
- # matching this uncommitted transaction. It's entirely possible that
- # the pending transaction doesn't have all it's objects linked into
- # the root yet.
- pending = self._pending.keys()[0]
- c = self._serials.cursor(txn=txn)
+ # matching an uncommitted transaction. It's possible that the pending
+ # transaction doesn't have all its objects linked into the root yet.
+ cm = ct = None
try:
- rec = c.first()
+ cm = self._txnMetadata.cursor(txn=txn)
+ ct = self._txnoids.cursor(txn=txn)
+ # Find the latest transaction before the pack time. To do this we
+ # find the smallest txnMetadata entry that is greater than
+ # packtid, and then move back one entry if they aren't equal.
+ try:
+ rec = cm.set_range(packtid)
+ except db.DBNotFoundError:
+ rec = cm.last()
+ if rec and rec[0] > packtid:
+ rec = cm.prev()
while rec:
- oid, tid = rec
- if len(tid) <> 8:
- tid = tidrec[8:]
- rec = c.next()
- if tid == pending:
- continue
- if self._packmark.has_key(oid):
- continue
- # This object is not referenced from any object reachable from
- # the root. We can eliminate all traces of it in the
- # database.
- self._zapobject(oid, txn)
+ tid, metadata = rec
+ rec = cm.prev()
+ # XXX
+## if metadata[0] == PROTECTED_TRANSACTION:
+## # We've scanned back to the last pack, so we don't need to
+## # go any further.
+## break
+ # Now look at the objects touched by this transaction. If it
+ # isn't root reachable, zap the whole object. If it is, but
+ # this revision isn't the current revision, then just this
+ # revision is packable.
+ orec = ct.set(tid)
+ while orec:
+ otid, oid = orec
+ orec = ct.next()
+ if otid <> tid:
+ break
+ if self._packmark.has_key(oid):
+ cserial, ctid = self._getSerialAndTid(oid)
+ if tid <> cserial:
+ # This is not the current revision
+ self._zaprevs.append(oid+tid, txn)
+ else:
+ # The whole object is packable
+ self._zaprevs.append(oid+DNE, txn)
finally:
- c.close()
- # And now we're done with the packmark table
+ if cm: cm.close()
+ if ct: ct.close()
+ # We're done with the mark table
self._packmark.truncate(txn=txn)
+ def _collect(self, txn):
+ rec = self._zaprevs.consume()
+ while rec:
+ revid = rec[1]
+ self._zaprevision(revid, txn)
+ rec = self._zaprevs.consume()
+ # And now we're done with this table too
+ self._zaprevs.truncate(txn=txn)
+
+ def _decref(self, deltas, txn):
+ for oid, delta in deltas.items():
+ refcount = U64(self._refcounts.get(oid, ZERO)) + delta
+ if refcount <= 0:
+ self._zaprevs.append(oid+DNE, txn)
+ else:
+ self._refcounts.put(oid, p64(refcount), txn=txn)
+
+ def _zaprevision(self, revid, txn):
+ # This method gets called when we're just zapping a single revision of
+ # an object, which we'd do if the object is still reachable from the
+ # root. See _zapobject() for a more efficient way to delete all
+ # traces of the object, if it's no longer reachable from the root.
+ #
+ # Get the metadata record for this revision, so we can find the
+ # associated pickle data. Then we can zap the metadata record
+ oid = revid[:8]
+ tid = revid[8:]
+ if tid == DNE:
+ # The entire object has been reference counted away
+ self._zapobject(oid, txn)
+ return
+ metadata = self._metadata[revid]
+ self._metadata.delete(revid, txn=txn)
+ vid, nvrevid, lrevid, prevrevid = unpack('>8s8s8s8s', metadata)
+ # Decrement the pickle reference count
+ key = oid+lrevid
+ refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
+ if refcount <= 0:
+ # We can collect this pickle
+ self._pickleRefcounts.delete(key, txn=txn)
+ data = self._pickles[key]
+ self._pickles.delete(key, txn=txn)
+ deltas = {}
+ self._update(deltas, data, -1)
+ self._decref(deltas, txn)
+ else:
+ self._pickleRefcounts.put(p64(refcount), txn=txn)
+ # Delete the txnoid table entry for this revision
+ c = self._txnoids.cursor(txn=txn)
+ try:
+ c.set_both(tid, oid)
+ c.delete()
+ finally:
+ c.close()
+
def _zapobject(self, oid, txn):
# Delete all current serial number records
c = self._serials.cursor(txn=txn)
try:
- rec = c.get(oid)
+ try:
+ rec = c.set(oid)
+ except db.DBNotFoundError:
+ # zaprevs could have multiple entries in it for the same
+ # oid+DNE combination. Rather than make sure zaprevs has
+ # unique keys (which would require an in-core dict or yet
+ # another table), we'll just assume that if the oid isn't in
+ # the serials table, we've already collected it in a previous
+ # call of _zapobject().
+ return
while rec:
if rec[0] <> oid:
break
@@ -1362,6 +1520,8 @@
self._update(deltas, data, -1)
finally:
c.close()
+ # Update the reference counts based on the deltas
+ self._decref(deltas, txn)
# Delete the pickleRefcounts entry for all revisions of this object
c = self._pickleRefcounts.cursor(txn=txn)
try:
@@ -1373,8 +1533,13 @@
rec = c.next()
finally:
c.close()
- # Delete the entry in the refcounts table for this object
- self._refcounts.delete(oid, txn=txn)
+ # Delete the entry in the refcounts table for this object, if there is
+ # one. Some of the unit tests don't link objects so there'll never be
+ # refcounts for them.
+ try:
+ self._refcounts.delete(oid, txn=txn)
+ except db.DBNotFoundError:
+ pass
# Now cruise through all the metadata records for this object, keeping
# track of the tids and vids so that we can clean up the txnoids and
# currentVersions tables
@@ -1390,9 +1555,10 @@
c.delete()
rec = c.next()
tid = revid[8:]
- vid = metadata[:8]
tids[tid] = 1
- vids[vid] = 1
+ vid = metadata[:8]
+ if vid <> ZERO:
+ vids[vid] = 1
finally:
c.close()
# Zap currentVersions entries
@@ -1419,112 +1585,6 @@
c.delete()
finally:
c.close()
- return deltas
-
- def _dopack(self, txn, t):
- # t is a TimeTime, or time float, convert this to a TimeStamp object,
- # using an algorithm similar to what's used in FileStorage. We know
- # that our transaction ids, a.k.a. revision ids, are timestamps. BAW:
- # This doesn't play nicely if you enable the `debugging revids'
- #
- # BAW: should a pack time in the future be a ValueError? We'd have to
- # worry about clock skew, so for now, we just set the pack time to the
- # minimum of t and now.
- packtime = min(t, time.time())
- t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
- packtid = `t0`
- # Calculate the set of objects reachable from the root. Anything else
- # is a candidate for having all their revisions packed away.
- self._mark(txn)
- # We now cruise through all the objects we know about, i.e. the keys
- # of the serials table, looking at all the object revisions earlier
- # than the pack time. If the revision is not the current revision,
- # then it's a packable revision. We employ a BDB trick of set_range()
- # to give us the smallest record greater than or equal to the one we
- # ask for. We move to the one just before that, and cruise backwards.
- #
- # This should also make us immune to evil future-pack time values,
- # although it would still be better to raise a ValueError in those
- # situations. This is a dictionary keyed off the object id, with
- # values which are a list of revisions (oid+tid) that can be packed.
- packablerevs = {}
- c = self._metadata.cursor()
- try:
- # BAW: can two threads be packing at the same time? If so, we
- # need to handle that. If not, we should enforce that with a
- # pack-lock.
- for oid in self._serials.keys():
- try:
- rec = c.set_range(oid+packtid)
- # The one just before this should be the largest record
- # less than or equal to the key, i.e. the object revision
- # just before the given pack time.
- rec = c.prev()
- except db.DBNotFoundError:
- # Perhaps the last record in the database is the last one
- # containing this oid?
- rec = c.last()
- # Now move backwards in time to look at all the revisions of
- # this object. All but the current one are packable, unless
- # the object isn't reachable from the root, in which case, all
- # its revisions are packable.
- while rec:
- key, data = rec
- rec = c.prev()
- # Make sure we're still looking at revisions for this
- # object
- if oid <> key[:8]:
- break
- if not reachables.has_key(oid):
- packablerevs.setdefault(oid, []).append(key)
- # Otherwise, if this isn't the current revision for this
- # object, then it's packable.
- else:
- serial, tid = self._getSerialAndTid(oid)
- if tid <> key[8:]:
- packablerevs.setdefault(oid, []).append(key)
- finally:
- c.close()
- # We now have all the packable revisions we're going to handle. For
- # each object with revisions that we're going to pack away, acquire
- # the storage lock so we can do that without fear of trampling by
- # other threads (i.e. interaction of transactionalUndo() and pack()).
- #
- # This set contains the oids of all objects that have been decref'd
- # to zero by the pack operation. To avoid recursion, we'll just note
- # them now and handle them in a loop later.
- #
- # BAW: should packs be transaction protected?
- decrefoids = {}
- for oid in packablerevs.keys():
- self._lock_acquire()
- try:
- for key in packablerevs[oid]:
- self._zaprevision(key, decrefoids, referencesf)
- finally:
- self._lock_release()
- # While there are still objects to collect, continue to do so.
- # Note that collecting an object may reveal more objects that are
- # dec refcounted to zero.
- while decrefoids:
- oid, ignore = decrefoids.popitem()
- self._zapobject(oid, decrefoids, referencesf)
-
- def pack(self, t, zreferencesf):
- # For all intents and purposes, referencesf here is always going to be
- # the same as ZODB.referencesf.referencesf. It's too much of a PITA
- # to pass that around to the helper methods, so just assert they're
- # the same.
- assert zreferencesf == referencesf
- zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
- # A simple wrapper around the bulk of packing, but which acquires a
- # lock that prevents multiple packs from running at the same time.
- self._packlock.acquire()
- try:
- self._withtxn(self._dopack, t)
- finally:
- self._packlock.release()
- zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
#
# GCable interface, for cyclic garbage collection (untested)