[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Full.py:1.48
Barry Warsaw
barry@wooz.org
Fri, 8 Nov 2002 18:17:16 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv3681
Modified Files:
Full.py
Log Message:
Support for autopacking in a separate thread. BerkeleyConfig now has
three new configuration variables for controlling how autopacking
works. Basically, you set an autopack frequency, a "packtime" -- the
point in the past you want to pack to -- and a counter for
automatically doing a classic pack.
Specific changes here include:
_setupDBs(): If autopacking is enabled, create the autopacking thread
object and get it started.
close(): When shutting down the storage, we need to stop and join the
autopacking thread, otherwise I think we have a high possibility of
corrupting our database (requiring recovery).
_dopack(): Add a flag for whether full gc should be done or not.
That's about the only difference between classic pack and autopack
(the latter does not do gc).
autopack(): The method that the autopacking thread calls to start an
autopack. It takes a pack time with the same semantics as pack(), but
it also takes a flag specifying whether to do garbage collection of
unreachable objects or not.
_Autopack: A derived class of threading.Thread to handing the
background autopacking.
=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.47 => 1.48 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.47 Fri Nov 8 14:35:51 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py Fri Nov 8 18:17:15 2002
@@ -19,6 +19,7 @@
import sys
import time
+import threading
import cPickle as pickle
from struct import pack, unpack
@@ -51,6 +52,12 @@
# DEBUGGING
#DNE = 'nonexist'
+# Number of seconds for the autopack thread to sleep before checking to see if
+# it's time for another autopack run. Lower numbers mean more processing,
+# higher numbers mean less responsiveness to shutdown requests. 10 seconds
+# seems like a good compromise.
+AUTOPACK_CHECK_SLEEP = 10
+
try:
# Python 2.2
from _helper import incr
@@ -79,6 +86,8 @@
"""
self._packlock = ThreadLock.allocate_lock()
BerkeleyBase.__init__(self, name, env, prefix, config)
+ # The autopack thread is started in _setupDBs() because we need
+ # information in one of the tables.
def _setupDBs(self):
# Data Type Assumptions:
@@ -252,9 +261,18 @@
db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
- # DEBUGGING
- #self._nextserial = 0L
- # END DEBUGGING
+ # Set up the autopacking thread
+ if self._config.frequency <= 0:
+ # No autopacking
+ self._autopacker = None
+ else:
+ config = self._config
+ lastpacktime = U64(self._last_packtime())
+ self._autopacker = _Autopack(
+ self, config.frequency,
+ config.packtime, config.classicpack,
+ lastpacktime)
+ self._autopacker.start()
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
@@ -290,6 +308,16 @@
self.__nextvid = 0L
def close(self):
+ # We must stop the autopacker first before closing any tables. BAW:
+ # should we use a timeout on the join() call? I'm not sure. On the
+ # one hand we don't want to block forever, but on the other, killing
+ # the autopacker thread in the middle of real work could leave the
+ # databases in a corrupted state, requiring recovery. With a
+ # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
+ if self._autopacker:
+ zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
+ self._autopacker.stop()
+ self._autopacker.join()
self._serials.close()
self._pickles.close()
self._refcounts.close()
@@ -482,10 +510,6 @@
self._txnMetadata.put(tid, data, txn=txn)
def _begin(self, tid, u, d, e):
- # DEBUGGING
- #self._nextserial += 1
- #self._serial = p64(self._nextserial)
- # END DEBUGGING
self._withtxn(self._dobegin, self._serial, u, d, e)
def _finish(self, tid, u, d, e):
@@ -1359,7 +1383,7 @@
# to pass that around to the helper methods, so just assert they're
# the same.
assert zreferencesf == referencesf
- zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
+ zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
@@ -1371,13 +1395,12 @@
self._dopack(t)
finally:
self._packlock.release()
- zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
+ zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished')
- def _dopack(self, t):
+ def _dopack(self, t, gc=True):
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
- # that our transaction ids, a.k.a. revision ids, are timestamps. BAW:
- # This doesn't play nicely if you enable the `debugging tids'
+ # that our transaction ids, a.k.a. revision ids, are timestamps.
#
# BAW: should a pack time in the future be a ValueError? We'd have to
# worry about clock skew, so for now, we just set the pack time to the
@@ -1399,6 +1422,9 @@
self._withtxn(self._collect_objs)
finally:
self._lock_release()
+ # If we're not doing a classic pack, we're done.
+ if not gc:
+ return
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
@@ -1423,6 +1449,23 @@
finally:
self._lock_release()
+ def autopack(self, t, gc):
+ zLOG.LOG('Full storage', zLOG.INFO,
+ 'autopack started (packtime: %s, gc? %s)'
+ % (t, gc and 'yes' or 'no'))
+ # A simple wrapper around the bulk of packing, but which acquires a
+ # lock that prevents multiple packs from running at the same time.
+ self._packlock.acquire()
+ try:
+ # We don't wrap this in _withtxn() because we're going to do the
+ # operation across several Berkeley transactions, which allows
+ # other work to happen (stores and reads) while packing is being
+ # done.
+ self._dopack(t, gc)
+ finally:
+ self._packlock.release()
+ zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished')
+
def _collect_revs(self, txn, packtid):
ct = co = None
try:
@@ -1826,3 +1869,42 @@
self.version = version
self.data = data
self.data_txn = data_txn
+
+
+
+class _Autopack(threading.Thread):
+ def __init__(self, storage, frequency, packtime, classicpack,
+ lastpacktime):
+ threading.Thread.__init__(self)
+ self._storage = storage
+ self._frequency = frequency
+ self._packtime = packtime
+ self._classicpack = classicpack
+ # Bookkeeping
+ self._stop = False
+ self._nextpack = lastpacktime + self._frequency
+ self._lastclassic = 0
+
+ def run(self):
+ zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
+ while not self._stop:
+ now = time.time()
+ if now > self._nextpack:
+ # Should we do a classic pack this time?
+ if self._classicpack <= 0:
+ classicp = False
+ else:
+ v = (self._lastclassic + 1) % self._classicpack
+ self._lastclassic = v
+ classicp = not v
+ # Run the autopack phase
+ self._storage.autopack(now - self._packtime, classicp)
+ self._nextpack = now + self._frequency
+ # Now we sleep for a little while before we check again. Sleep
+ # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
+ # to be as responsive as ossible to .stop() calls.
+ time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
+ zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
+
+ def stop(self):
+ self._stop = True