[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Full.py:1.48

Barry Warsaw barry@wooz.org
Fri, 8 Nov 2002 18:17:16 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv3681

Modified Files:
	Full.py 
Log Message:
Support for autopacking in a separate thread.  BerkeleyConfig now has
three new configuration variables for controlling how autopacking
works.  Basically, you set an autopack frequency, a "packtime" -- the
point in the past you want to pack to -- and a counter for
automatically doing a classic pack.

Specific changes here include:

_setupDBs(): If autopacking is enabled, create the autopacking thread
object and get it started.

close(): When shutting down the storage, we need to stop and join the
autopacking thread, otherwise I think we have a high possibility of
corrupting our database (requiring recovery).

_dopack(): Add a flag for whether full gc should be done or not.
That's about the only difference between classic pack and autopack
(the latter does not do gc).

autopack(): The method that the autopacking thread calls to start an
autopack.  It takes a pack time with the same semantics as pack(), but
it also takes a flag specifying whether to do garbage collection of
unreachable objects or not.

_Autopack: A derived class of threading.Thread to handing the
background autopacking.


=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.47 => 1.48 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.47	Fri Nov  8 14:35:51 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py	Fri Nov  8 18:17:15 2002
@@ -19,6 +19,7 @@
 
 import sys
 import time
+import threading
 import cPickle as pickle
 from struct import pack, unpack
 
@@ -51,6 +52,12 @@
 # DEBUGGING
 #DNE = 'nonexist'
 
+# Number of seconds for the autopack thread to sleep before checking to see if
+# it's time for another autopack run.  Lower numbers mean more processing,
+# higher numbers mean less responsiveness to shutdown requests.  10 seconds
+# seems like a good compromise.
+AUTOPACK_CHECK_SLEEP = 10
+
 try:
     # Python 2.2
     from _helper import incr
@@ -79,6 +86,8 @@
         """
         self._packlock = ThreadLock.allocate_lock()
         BerkeleyBase.__init__(self, name, env, prefix, config)
+        # The autopack thread is started in _setupDBs() because we need
+        # information in one of the tables.
 
     def _setupDBs(self):
         # Data Type Assumptions:
@@ -252,9 +261,18 @@
                             db.DB_QUEUE, db.DB_CREATE)
         # Do recovery and consistency checks
         self._withlock(self._dorecovery)
-        # DEBUGGING
-        #self._nextserial = 0L
-        # END DEBUGGING
+        # Set up the autopacking thread
+        if self._config.frequency <= 0:
+            # No autopacking
+            self._autopacker = None
+        else:
+            config = self._config
+            lastpacktime = U64(self._last_packtime())
+            self._autopacker = _Autopack(
+                self, config.frequency,
+                config.packtime, config.classicpack,
+                lastpacktime)
+            self._autopacker.start()
 
     def _dorecovery(self):
         # If these tables are non-empty, it means we crashed during a pack
@@ -290,6 +308,16 @@
             self.__nextvid = 0L
 
     def close(self):
+        # We must stop the autopacker first before closing any tables.  BAW:
+        # should we use a timeout on the join() call?  I'm not sure.  On the
+        # one hand we don't want to block forever, but on the other, killing
+        # the autopacker thread in the middle of real work could leave the
+        # databases in a corrupted state, requiring recovery.  With a
+        # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
+        if self._autopacker:
+            zLOG.LOG('Full storage', zLOG.INFO, 'stopping autopack thread')
+            self._autopacker.stop()
+            self._autopacker.join()
         self._serials.close()
         self._pickles.close()
         self._refcounts.close()
@@ -482,10 +510,6 @@
         self._txnMetadata.put(tid, data, txn=txn)
 
     def _begin(self, tid, u, d, e):
-        # DEBUGGING
-        #self._nextserial += 1
-        #self._serial = p64(self._nextserial)
-        # END DEBUGGING
         self._withtxn(self._dobegin, self._serial, u, d, e)
 
     def _finish(self, tid, u, d, e):
@@ -1359,7 +1383,7 @@
         # to pass that around to the helper methods, so just assert they're
         # the same.
         assert zreferencesf == referencesf
-        zLOG.LOG('Full storage', zLOG.INFO, 'pack started')
+        zLOG.LOG('Full storage', zLOG.INFO, 'classic pack started')
         # A simple wrapper around the bulk of packing, but which acquires a
         # lock that prevents multiple packs from running at the same time.
         self._packlock.acquire()
@@ -1371,13 +1395,12 @@
             self._dopack(t)
         finally:
             self._packlock.release()
-        zLOG.LOG('Full storage', zLOG.INFO, 'pack done')
+        zLOG.LOG('Full storage', zLOG.INFO, 'classic pack finished')
 
-    def _dopack(self, t):
+    def _dopack(self, t, gc=True):
         # t is a TimeTime, or time float, convert this to a TimeStamp object,
         # using an algorithm similar to what's used in FileStorage.  We know
-        # that our transaction ids, a.k.a. revision ids, are timestamps.  BAW:
-        # This doesn't play nicely if you enable the `debugging tids'
+        # that our transaction ids, a.k.a. revision ids, are timestamps.
         #
         # BAW: should a pack time in the future be a ValueError?  We'd have to
         # worry about clock skew, so for now, we just set the pack time to the
@@ -1399,6 +1422,9 @@
             self._withtxn(self._collect_objs)
         finally:
             self._lock_release()
+        # If we're not doing a classic pack, we're done.
+        if not gc:
+            return
         # Do a mark and sweep for garbage collection.  Calculate the set of
         # objects reachable from the root.  Anything else is a candidate for
         # having all their revisions packed away.  The set of reachable
@@ -1423,6 +1449,23 @@
         finally:
             self._lock_release()
 
+    def autopack(self, t, gc):
+        zLOG.LOG('Full storage', zLOG.INFO,
+                 'autopack started (packtime: %s, gc? %s)'
+                 % (t, gc and 'yes' or 'no'))
+        # A simple wrapper around the bulk of packing, but which acquires a
+        # lock that prevents multiple packs from running at the same time.
+        self._packlock.acquire()
+        try:
+            # We don't wrap this in _withtxn() because we're going to do the
+            # operation across several Berkeley transactions, which allows
+            # other work to happen (stores and reads) while packing is being
+            # done.
+            self._dopack(t, gc)
+        finally:
+            self._packlock.release()
+        zLOG.LOG('Full storage', zLOG.INFO, 'autopack finished')
+
     def _collect_revs(self, txn, packtid):
         ct = co = None
         try:
@@ -1826,3 +1869,42 @@
         self.version = version
         self.data = data
         self.data_txn = data_txn
+
+
+
+class _Autopack(threading.Thread):
+    def __init__(self, storage, frequency, packtime, classicpack,
+                 lastpacktime):
+        threading.Thread.__init__(self)
+        self._storage = storage
+        self._frequency = frequency
+        self._packtime = packtime
+        self._classicpack = classicpack
+        # Bookkeeping
+        self._stop = False
+        self._nextpack = lastpacktime + self._frequency
+        self._lastclassic = 0
+
+    def run(self):
+        zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread started')
+        while not self._stop:
+            now = time.time()
+            if now > self._nextpack:
+                # Should we do a classic pack this time?
+                if self._classicpack <= 0:
+                    classicp = False
+                else:
+                    v = (self._lastclassic + 1) % self._classicpack
+                    self._lastclassic = v
+                    classicp = not v
+                # Run the autopack phase
+                self._storage.autopack(now - self._packtime, classicp)
+                self._nextpack = now + self._frequency
+            # Now we sleep for a little while before we check again.  Sleep
+            # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
+            # to be as responsive as ossible to .stop() calls.
+            time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
+        zLOG.LOG('Full storage', zLOG.INFO, 'autopack thread finished')
+
+    def stop(self):
+        self._stop = True