[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Minimal.py:1.14
Barry Warsaw
barry@wooz.org
Mon, 11 Nov 2002 15:59:45 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv21428
Modified Files:
Minimal.py
Log Message:
Added pack and autopack support. Packing is only necessary to garbage
collect cycles.
Also, some code updating and re-org to factor code into the base
class.
_dostore(): Support conflict resolution.
=== ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py 1.13 => 1.14 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py:1.13 Tue Nov 5 18:07:32 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py Mon Nov 11 15:59:44 2002
@@ -17,6 +17,9 @@
__version__ = '$Revision$'[-2:][0]
+import time
+import threading
+
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3.
@@ -29,15 +32,29 @@
from ZODB import POSException
from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf
+from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
+import zLOG
ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
ZERO = '\0'*8
+# Number of seconds for the autopack thread to sleep before checking to see if
+# it's time for another autopack run. Lower numbers mean more processing,
+# higher numbers mean less responsiveness to shutdown requests. 10 seconds
+# seems like a good compromise.
+AUTOPACK_CHECK_SLEEP = 10
+
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
-class Minimal(BerkeleyBase):
+class Minimal(BerkeleyBase, ConflictResolvingStorage):
def _setupDBs(self):
# Data Type Assumptions:
#
@@ -77,53 +94,74 @@
# no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty.
#
+ # packmark -- [oid]
+ # Every object reachable from the root during a classic pack
+ # operation will have its oid present in this table.
+ #
+ # oidqueue -- [oid]
+ # This table is a Queue, not a BTree. It is used during the mark
+ # phase of pack() and contains a list of oids for work to be done.
+ # It is also used during pack to list objects for which no more
+ # references exist, such that the objects can be completely packed
+ # away.
+ #
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts')
self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending')
+ # Tables to support packing.
+ self._packmark = self._setupDB('packmark')
+ self._oidqueue = db.DB(self._env)
+ self._oidqueue.set_re_len(8)
+ # BAW: do we need to set the queue extent size?
+ self._oidqueue.open(self._prefix + 'oidqueue',
+ db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks
pendings = self._pending.keys()
assert len(pendings) <= 1
if len(pendings) == 0:
assert len(self._oids) == 0
- return
- # Do recovery
- tid = pendings[0]
- flag = self._pending.get(tid)
- assert flag in (ABORT, COMMIT)
- self._lock_acquire()
- try:
- if flag == ABORT:
- self._do(self._doabort, tid)
- else:
- self._do(self._docommit, tid)
- finally:
- self._lock_release()
+ else:
+ # Do recovery
+ tid = pendings[0]
+ flag = self._pending.get(tid)
+ assert flag in (ABORT, COMMIT)
+ self._lock_acquire()
+ try:
+ if flag == ABORT:
+ self._withtxn(self._doabort, tid)
+ else:
+ self._withtxn(self._docommit, tid)
+ finally:
+ self._lock_release()
+ # Set up the autopacking thread
+ if self._config.frequency > 0:
+ config = self._config
+ self._autopacker = _Autopack(self, config.frequency)
+ self._autopacker.start()
def close(self):
+ # We must stop the autopacker first before closing any tables. BAW:
+ # should we use a timeout on the join() call? I'm not sure. On the
+ # one hand we don't want to block forever, but on the other, killing
+ # the autopacker thread in the middle of real work could leave the
+ # databases in a corrupted state, requiring recovery. With a
+ # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
+ if self._autopacker:
+ zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread')
+ self._autopacker.stop()
+ self._autopacker.join()
self._serials.close()
self._pickles.close()
self._refcounts.close()
self._oids.close()
self._pending.close()
+ self._packmark.close()
+ self._oidqueue.close()
BerkeleyBase.close(self)
- def _do(self, meth, tid):
- txn = self._env.txn_begin()
- try:
- meth(tid, txn)
- self._oids.truncate(txn)
- self._pending.truncate(txn)
- except:
- txn.abort()
- self._docheckpoint()
- raise
- else:
- txn.commit()
- self._docheckpoint()
-
- def _doabort(self, tid, txn):
+ def _doabort(self, txn, tid):
co = cs = None
try:
co = self._oids.cursor(txn=txn)
@@ -145,8 +183,14 @@
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
+ # We're done with these tables
+ self._oids.truncate(txn)
+ self._pending.truncate()
+
+ def _abort(self):
+ self._withtxn(self._doabort, self._serial)
- def _docommit(self, tid, txn):
+ def _docommit(self, txn, tid):
deltas = {}
co = cs = None
try:
@@ -183,6 +227,9 @@
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
+ # We're done with this table
+ self._oids.truncate(txn)
+ self._pending.truncate()
# Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0
# objects.
@@ -192,9 +239,9 @@
def _update_refcounts(self, deltas, txn):
newdeltas = {}
for oid, delta in deltas.items():
- rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
- assert rc >= 0
- if rc == 0:
+ refcount = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
+ assert refcount >= 0
+ if refcount == 0:
# The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its
@@ -209,7 +256,7 @@
self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn)
else:
- self._refcounts.put(oid, p64(rc), txn=txn)
+ self._refcounts.put(oid, p64(refcount), txn=txn)
# Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round.
return newdeltas
@@ -220,6 +267,29 @@
# will be aborted.
self._pending[self._serial] = ABORT
+ def _dostore(self, txn, oid, serial, data):
+ conflictresolved = False
+ oserial = self._getCurrentSerial(oid)
+ if oserial is not None and serial <> oserial:
+ # The object exists in the database, but the serial number
+ # given in the call is not the same as the last stored serial
+ # number. Raise a ConflictError.
+ data = self.tryToResolveConflict(oid, oserial, serial, data)
+ if data:
+ conflictresolved = True
+ else:
+ raise POSException.ConflictError(serials=(oserial, serial))
+ # Optimistically write to the serials and pickles table. Be sure
+ # to also update the oids table for this object too.
+ newserial = self._serial
+ self._serials.put(oid, newserial, txn=txn)
+ self._pickles.put(oid+newserial, data, txn=txn)
+ self._oids.put(oid, PRESENT, txn=txn)
+ # Return the new serial number for the object
+ if conflictresolved:
+ return ResolvedSerial
+ return newserial
+
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
@@ -229,43 +299,16 @@
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
- oserial = self._getCurrentSerial(oid)
- if oserial is not None and serial <> oserial:
- # The object exists in the database, but the serial number
- # given in the call is not the same as the last stored serial
- # number. Raise a ConflictError.
- #
- # BAW: do application level conflict resolution
- raise POSException.ConflictError(serials=(oserial, serial))
- # Optimistically write to the serials and pickles table. Be sure
- # to also update the oids table for this object too.
- newserial = self._serial
- txn = self._env.txn_begin()
- try:
- self._serials.put(oid, newserial, txn=txn)
- self._pickles.put(oid+newserial, data, txn=txn)
- self._oids.put(oid, PRESENT, txn=txn)
- except:
- txn.abort()
- self._docheckpoint()
- raise
- else:
- txn.commit()
- self._docheckpoint()
+ return self._withtxn(self._dostore, oid, serial, data)
finally:
self._lock_release()
- # Return the new serial number for the object
- return newserial
def _finish(self, tid, u, d, e):
# Twiddle the pending flag to COMMIT now since after the vote call, we
# promise that the changes will be committed, no matter what. The
# recovery process will check this.
self._pending[self._serial] = COMMIT
- self._do(self._docommit, self._serial)
-
- def _abort(self):
- self._do(self._doabort, self._serial)
+ self._withtxn(self._docommit, self._serial)
#
# Accessor interface
@@ -326,3 +369,207 @@
# So BaseStorage.getSerial() just works. Note that this storage
# doesn't support versions.
return ''
+
+ #
+ # Packing. In Minimal storage, packing is only required to get rid of
+ # object cycles, since there are no old object revisions.
+ #
+
+ def pack(self, t, zreferencesf):
+ # For all intents and purposes, referencesf here is always going to be
+ # the same as ZODB.referencesf.referencesf. It's too much of a PITA
+ # to pass that around to the helper methods, so just assert they're
+ # the same.
+ assert zreferencesf == referencesf
+ zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started')
+ # A simple wrapper around the bulk of packing, but which acquires a
+ # lock that prevents multiple packs from running at the same time.
+ self._packlock.acquire()
+ try:
+ # We don't wrap this in _withtxn() because we're going to do the
+ # operation across several Berkeley transactions, which allows
+ # other work to happen (stores and reads) while packing is being
+ # done.
+ #
+ # Also, we don't care about the pack time, since we don't need to
+ # collect object revisions
+ self._dopack()
+ finally:
+ self._packlock.release()
+ zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished')
+
+ def _dopack(self):
+ # Do a mark and sweep for garbage collection. Calculate the set of
+ # objects reachable from the root. Anything else is a candidate for
+ # having all their revisions packed away. The set of reachable
+ # objects lives in the _packmark table.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._mark)
+ finally:
+ self._lock_release()
+ # Now perform a sweep, using oidqueue to hold all object ids for
+ # objects which are not root reachable as of the pack time.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._sweep)
+ finally:
+ self._lock_release()
+ # Once again, collect any objects with refcount zero due to the mark
+ # and sweep garbage collection pass.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect_objs)
+ finally:
+ self._lock_release()
+
+ def _mark(self, txn):
+ # Find the oids for all the objects reachable from the root. To
+ # reduce the amount of in-core memory we need do do a pack operation,
+ # we'll save the mark data in the packmark table. The oidqueue is a
+ # BerkeleyDB Queue that holds the list of object ids to look at next,
+ # and by using this we don't need to keep an in-memory dictionary.
+ assert len(self._packmark) == 0
+ assert len(self._oidqueue) == 0
+ # Quick exit for empty storages
+ if not self._serials:
+ return
+ # The oid of the object we're looking at, starting at the root
+ oid = ZERO
+ # Start at the root, find all the objects the current revision of the
+ # root references, and then for each of those, find all the objects it
+ # references, and so on until we've traversed the entire object graph.
+ while oid:
+ if self._packmark.has_key(oid):
+ # We've already seen this object
+ continue
+ self._packmark.put(oid, PRESENT, txn=txn)
+ # Get the pickle data for this object
+ tid = self._getCurrentSerial(oid)
+ # Say there's no root object (as is the case in some of the unit
+ # tests), and we're looking up oid ZERO. Then serial will be None.
+ if tid is not None:
+ data = self._pickles[oid+tid]
+ # Now get the oids of all the objects referenced by this pickle
+ refdoids = []
+ referencesf(data, refdoids)
+ # And append them to the queue for later
+ for oid in refdoids:
+ self._oidqueue.append(oid, txn)
+ # Pop the next oid off the queue and do it all again
+ rec = self._oidqueue.consume()
+ oid = rec and rec[1]
+ assert len(self._oidqueue) == 0
+
+ def _sweep(self, txn):
+ c = self._serials.cursor(txn=txn)
+ try:
+ rec = c.first()
+ while rec:
+ oid = rec[0]
+ rec = c.next()
+ # If packmark (which knows about all the root reachable
+ # objects) doesn't have a record for this guy, then we can zap
+ # it. Do so by appending to oidqueue.
+ if not self._packmark.has_key(oid):
+ self._oidqueue.append(oid, txn)
+ finally:
+ c.close()
+ # We're done with the mark table
+ self._packmark.truncate(txn=txn)
+
+ def _collect_objs(self, txn):
+ orec = self._oidqueue.consume()
+ while orec:
+ oid = orec[1]
+ # Delete the object from the serials table
+ c = self._serials.cursor(txn)
+ try:
+ rec = c.set(oid)
+ while rec and rec[0] == oid:
+ c.delete()
+ rec = c.next_dup()
+ # We don't need the refcounts any more, but note that if the
+ # object was never referenced from another object, there may
+ # not be a refcounts entry.
+ try:
+ self._refcounts.delete(oid, txn=txn)
+ except db.DBNotFoundError:
+ pass
+ finally:
+ c.close()
+ # Now collect the pickle data and do reference counting
+ c = self._pickles.cursor(txn)
+ try:
+ rec = c.set_range(oid)
+ while rec and rec[0][:8] == oid:
+ data = rec[1]
+ c.delete()
+ rec = c.next()
+ deltas = {}
+ self._update(deltas, data, -1)
+ for oid, delta in deltas.items():
+ refcount = U64(self._refcounts.get(oid, ZERO)) + delta
+ assert refcount >= 0
+ if refcount == 0:
+ self._oidqueue.append(oid, txn)
+ else:
+ self._refcounts.put(oid, p64(refcount), txn=txn)
+ finally:
+ c.close()
+ # We really do want this down here, since _decrefPickle() could
+ # add more items to the queue.
+ orec = self._oidqueue.consume()
+ assert len(self._oidqueue) == 0
+
+ #
+ # Stuff we don't support
+ #
+
+ def supportsTransactionalUndo(self):
+ return False
+
+ def supportsUndo(self):
+ return False
+
+ def supportsVersions(self):
+ return False
+
+ # Don't implement these
+ #
+ # versionEmpty(self, version)
+ # versions(self, max=None)
+ # loadSerial(self, oid, serial)
+ # getSerial(self, oid)
+ # transactionalUndo(self, tid, transaction)
+ # undoLog(self, first=0, last=-20, filter=None)
+ # history(self, oid, version=None, size=1, filter=None)
+ # iterator(self, start=None, stop=None)
+
+
+
+class _Autopack(threading.Thread):
+ def __init__(self, storage, frequency):
+ threading.Thread.__init__(self)
+ self._storage = storage
+ self._frequency = frequency
+ # Bookkeeping
+ self._stop = False
+ self._nextpack = 0
+
+ def run(self):
+ zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
+ while not self._stop:
+ now = time.time()
+ if now > self._nextpack:
+ # Run the autopack phase
+ self._storage.pack('ignored', referencesf)
+ self._nextpack = now + self._frequency
+ # Now we sleep for a little while before we check again. Sleep
+ # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
+ # to be as responsive as ossible to .stop() calls.
+ time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
+ zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
+
+ def stop(self):
+ self._stop = True