[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - Minimal.py:1.16

Barry Warsaw barry@wooz.org
Tue, 19 Nov 2002 15:34:57 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv1468

Modified Files:
	Minimal.py 
Log Message:
_setupDBs(): Use the new extended _setupDB() arguments for the
DB_QUEUE tables.

close(): We can really simplify this and make it more robust for when
we add new tables, by relying on the fact that the base class
maintains its own list of opened tables, and the base class close()
method closes them in turn.

autopack(): Default the gc argument to False.

_collect_objs(), _mark(), _sweep(): Add an escape hatch for the pack
operation inside the inner loops of each of these methods.  That way,
we don't have to wait until the loops are finished to exit the pack
operation, if stop() has been requested by the main thread.

_Autopack: Use the _WorkThread base class.


=== ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py 1.15 => 1.16 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py:1.15	Mon Nov 11 17:06:22 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py	Tue Nov 19 15:34:57 2002
@@ -25,16 +25,17 @@
 # PyBSDDB3.
 from bsddb3 import db
 
-# BerkeleyBase class provides some common functionality for BerkeleyDB-based
-# storages.  It in turn inherits from BaseStorage which itself provides some
-# common storage functionality.
-from BerkeleyBase import BerkeleyBase
 from ZODB import POSException
 from ZODB.utils import U64, p64
 from ZODB.referencesf import referencesf
 from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
 import zLOG
 
+# BerkeleyBase class provides some common functionality for BerkeleyDB-based
+# storages.  It in turn inherits from BaseStorage which itself provides some
+# common storage functionality.
+from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
+
 ABORT = 'A'
 COMMIT = 'C'
 PRESENT = 'X'
@@ -112,11 +113,7 @@
         self._pending = self._setupDB('pending')
         # Tables to support packing.
         self._packmark = self._setupDB('packmark')
-        self._oidqueue = db.DB(self._env)
-        self._oidqueue.set_re_len(8)
-        # BAW: do we need to set the queue extent size?
-        self._oidqueue.open(self._prefix + 'oidqueue',
-                            db.DB_QUEUE, db.DB_CREATE)
+        self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
         # Do recovery and consistency checks
         pendings = self._pending.keys()
         assert len(pendings) <= 1
@@ -142,24 +139,23 @@
             self._autopacker.start()
 
     def close(self):
-        # We must stop the autopacker first before closing any tables.  BAW:
-        # should we use a timeout on the join() call?  I'm not sure.  On the
-        # one hand we don't want to block forever, but on the other, killing
-        # the autopacker thread in the middle of real work could leave the
-        # databases in a corrupted state, requiring recovery.  With a
-        # AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
-        if self._autopacker:
-            zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread')
-            self._autopacker.stop()
-            self._autopacker.join()
-        self._serials.close()
-        self._pickles.close()
-        self._refcounts.close()
-        self._oids.close()
-        self._pending.close()
-        self._packmark.close()
-        self._oidqueue.close()
-        BerkeleyBase.close(self)
+        # Set this flag before acquiring the lock so we don't block waiting
+        # for the autopack thread to give up the lock.
+        self._stop = True
+        self._lock_acquire()
+        try:
+            # We must stop the autopacker and checkpointing threads first
+            # before closing any tables.  I'm not sure about the join()
+            # timeout, but I'd be surprised if any particular iteration of a
+            # pack-related loops take longer than a few seconds.
+            if self._autopacker:
+                zLOG.LOG('Minimal storage', zLOG.INFO,
+                         'stopping autopack thread')
+                self._autopacker.stop()
+                self._autopacker.join(30)
+            BerkeleyBase.close(self)
+        finally:
+            self._lock_release()
 
     def _doabort(self, txn, tid):
         co = cs = None
@@ -440,6 +436,8 @@
         # root references, and then for each of those, find all the objects it
         # references, and so on until we've traversed the entire object graph.
         while oid:
+            if self._stop:
+                raise PackStop, 'stopped in _mark()'
             if not self._packmark.has_key(oid):
                 # We've haven't yet seen this object
                 self._packmark.put(oid, PRESENT, txn=txn)
@@ -467,6 +465,8 @@
         try:
             rec = c.first()
             while rec:
+                if self._stop:
+                    raise PackStop, 'stopped in _sweep()'
                 oid = rec[0]
                 rec = c.next()
                 # If packmark (which knows about all the root reachable
@@ -482,6 +482,8 @@
     def _collect_objs(self, txn):
         orec = self._oidqueue.consume()
         while orec:
+            if self._stop:
+                raise PackStop, 'stopped in _collect_objs()'
             oid = orec[1]
             serial = self._getCurrentSerial(oid)
             # Delete the object from the serials table
@@ -492,6 +494,8 @@
                 except db.DBNotFoundError:
                     rec = None
                 while rec and rec[0] == oid:
+                    if self._stop:
+                        raise PackStop, 'stopped in _collect_objs() loop 1'
                     c.delete()
                     rec = c.next_dup()
                 # We don't need the refcounts any more, but note that if the
@@ -511,6 +515,8 @@
                 except db.DBNotFoundError:
                     rec = None
                 while rec and rec[0][:8] == oid:
+                    if self._stop:
+                        raise PackStop, 'stopped in _collect_objs() loop 2'
                     data = rec[1]
                     c.delete()
                     rec = c.next()
@@ -555,28 +561,10 @@
 
 
 
-class _Autopack(threading.Thread):
+class _Autopack(_WorkThread):
     def __init__(self, storage, frequency):
-        threading.Thread.__init__(self)
-        self._storage = storage
-        self._frequency = frequency
-        # Bookkeeping
-        self._stop = False
-        self._nextpack = 0
-
-    def run(self):
-        zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
-        while not self._stop:
-            now = time.time()
-            if now > self._nextpack:
-                # Run the autopack phase
-                self._storage.pack('ignored', referencesf)
-                self._nextpack = now + self._frequency
-            # Now we sleep for a little while before we check again.  Sleep
-            # for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
-            # to be as responsive as ossible to .stop() calls.
-            time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
-        zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
+        _WorkThread.__init__(self, storage, frequency, 'autopacking')
 
-    def stop(self):
-        self._stop = True
+    def _dowork(self, now):
+        # Run the autopack phase
+        self._storage.pack('ignored', referencesf)