[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - BerkeleyBase.py:1.30 Full.py:1.57 Minimal.py:1.21

Barry Warsaw barry@wooz.org
Fri, 6 Dec 2002 00:58:38 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv28652/bsddb3Storage

Modified Files:
	BerkeleyBase.py Full.py Minimal.py 
Log Message:
Rework the background threads to be poll-with-timeout based instead of
sleep based.  We create two ends of a pipe in the parent thread, with
the child thread polling/reading one end and the parent writing to the
other.  The only thing written to this pipe is a "stop marker" -- just
a string that wakes the thread up immediately when we're closing the
storage.

The primary reason for this is to speed up shutdown so that we don't
have to wait for the next tick of the sleep counter before we'll
trigger the thread shutdown.  Makes shutting down Zope using this
storage much quicker.

Specific changes include:

BerkeleyBase.py

    SLEEP_TIME -> JOIN_TIME since it's now just the interval we'll
    wait for the thread.join to complete.

    __init__():	Create both the checkpointer thread and the autopacker
    thread, set up the pipes, and get them both rolling.  We refactor
    creation of the autopacker instance into a separate overrideable
    method since this is the one bit that's different between the two
    storages.

    _make_autopacker(): Intended to be overridden.

    close(), _doclose(): Move the thread shutdown code out of the
    lock, since we don't want to potentially deadlock the shutting
    down of the background thread.  This could happen if close() was
    entered when the autopacker thread was between lock acquisitions
    in _dopack().  Also, be sure to write to the pipe to wake the
    child threads up immediately.

    env_from_string(): Wrap the actual creation and opening of the
    environment in a try/except, so that if there's a failure, we can
    be sure to give up the file lock.

    _WorkThread class: Accept a poll object which wraps the read end
    of the pipe.  Rework run() to do the poll-with-timeout instead of
    sleep.  I don't think we strictly need to do the fd read given the
    simplicity (and mono-commandity) of the protocol, but it can't
    hurt.  The _dowork() method's signature no longer contains the
    `now' variable.

Full.py

    _make_autopacker(): Override base class to return storage specific
    _Autopack instance.

Minimal.py

    Same, but also include some code cleanup.


Also, get rid of some unnecessary imports.


=== ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py 1.29 => 1.30 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py:1.29	Tue Dec  3 14:13:42 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py	Fri Dec  6 00:58:38 2002
@@ -19,6 +19,7 @@
 import os
 import time
 import errno
+import select
 import threading
 from types import StringType
 
@@ -28,7 +29,6 @@
 
 # BaseStorage provides primitives for lock acquisition and release, and a host
 # of other methods, some of which are overridden here, some of which are not.
-from ZODB import POSException
 from ZODB.lock_file import lock_file
 from ZODB.BaseStorage import BaseStorage
 from ZODB.referencesf import referencesf
@@ -37,12 +37,11 @@
 
 GBYTES = 1024 * 1024 * 1000
 
-# Maximum number of seconds for background thread to sleep before checking to
-# see if it's time for another autopack run.  Lower numbers mean more
-# processing, higher numbers mean less responsiveness to shutdown requests.
-# 10 seconds seems like a good compromise.  Note that if the check interval is
-# less than the sleep time, the minimum will be used.
-SLEEP_TIME = 10
+# How long should we wait to join one of the background daemon threads?  It's
+# a good idea to not set this too short, or we could corrupt our database.
+# That would be recoverable, but recovery could take a long time too, so it's
+# better to shutdown cleanly.
+JOIN_TIME = 10
 
 try:
     True, False
@@ -189,7 +188,6 @@
 
         # Instantiate a pack lock
         self._packlock = ThreadLock.allocate_lock()
-        self._autopacker = None
         self._stop = self._closed = False
         # Initialize a few other things
         self._prefix = prefix
@@ -199,11 +197,27 @@
         self._setupDBs()
         # Initialize the object id counter.
         self._init_oid()
+        # Set up the checkpointing thread
         if config.interval > 0:
-            self._checkpointer = _Checkpoint(self, config.interval)
+            r, self._checkpointfd = os.pipe()
+            poll = select.poll()
+            poll.register(r, select.POLLIN)
+            self._checkpointer = _Checkpoint(self, poll, config.interval)
             self._checkpointer.start()
         else:
             self._checkpointer = None
+        # Set up the autopacking thread
+        if config.frequency > 0:
+            r, self._autopackfd = os.pipe()
+            poll = select.poll()
+            poll.register(r, select.POLLIN)
+            self._autopacker = self._make_autopacker(poll)
+            self._autopacker.start()
+        else:
+            self._autopacker = None
+
+    def _make_autopacker(self, poll):
+        raise NotImplementedError
 
     def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
         """Open an individual database with the given flags.
@@ -321,9 +335,21 @@
         tables are closed, and finally the environment is force checkpointed
         and closed too.
         """
-        # Set this flag before acquiring the lock so we don't block waiting
-        # for the autopack thread to give up the lock.
+        # We have to shutdown the background threads before we acquire the
+        # lock, or we'll could end up closing the environment before the
+        # autopacking thread exits.
         self._stop = True
+        # Stop the autopacker thread
+        if self._autopacker:
+            self.log('stopping autopacking thread')
+            self._autopacker.stop()
+            os.write(self._autopackfd, 'STOP')
+            self._autopacker.join(JOIN_TIME)
+        if self._checkpointer:
+            self.log('stopping checkpointing thread')
+            self._checkpointer.stop()
+            os.write(self._checkpointfd, 'STOP')
+            self._checkpointer.join(JOIN_TIME)
         self._lock_acquire()
         try:
             if not self._closed:
@@ -333,15 +359,6 @@
             self._lock_release()
 
     def _doclose(self):
-        # Stop the autopacker thread
-        if self._autopacker:
-            self.log('stopping autopacking thread')
-            self._autopacker.stop()
-            self._autopacker.join(SLEEP_TIME * 2)
-        if self._checkpointer:
-            self.log('stopping checkpointing thread')
-            self._checkpointer.stop()
-            self._checkpointer.join(SLEEP_TIME * 2)
         # Close all the tables
         for d in self._tables:
             d.close()
@@ -426,30 +443,36 @@
     lock_file(lockfile)
     lockfile.write(str(os.getpid()))
     lockfile.flush()
-    # Create, initialize, and open the environment
-    env = db.DBEnv()
-    if config.logdir is not None:
-        env.set_lg_dir(config.logdir)
-    gbytes, bytes = divmod(config.cachesize, GBYTES)
-    env.set_cachesize(gbytes, bytes)
-    env.open(envname,
-             db.DB_CREATE          # create underlying files as necessary
-             | db.DB_RECOVER       # run normal recovery before opening
-             | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
-             | db.DB_INIT_TXN      # initialize transaction subsystem
-             | db.DB_THREAD        # we use the environment from other threads
-             )
+    try:
+        # Create, initialize, and open the environment
+        env = db.DBEnv()
+        if config.logdir is not None:
+            env.set_lg_dir(config.logdir)
+        gbytes, bytes = divmod(config.cachesize, GBYTES)
+        env.set_cachesize(gbytes, bytes)
+        env.open(envname,
+                 db.DB_CREATE          # create underlying files as necessary
+                 | db.DB_RECOVER       # run normal recovery before opening
+                 | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
+                 | db.DB_INIT_TXN      # initialize transaction subsystem
+                 | db.DB_THREAD        # we use the env from multiple threads
+                 )
+    except:
+        lockfile.close()
+        raise
     return env, lockfile
 
 
 
 class _WorkThread(threading.Thread):
-    def __init__(self, storage, checkinterval, name='work'):
+    def __init__(self, storage, poll, checkinterval, name='work'):
         threading.Thread.__init__(self)
         self._storage = storage
+        self._poll = poll
         self._interval = checkinterval
         self._name = name
-        # Bookkeeping
+        # Bookkeeping.  _nextcheck is useful as a non-public interface aiding
+        # testing.  See test_autopack.py.
         self._stop = False
         self._nextcheck = checkinterval
         # We don't want these threads to hold up process exit.  That could
@@ -461,27 +484,34 @@
         self._storage.log('%s thread started', name)
         while not self._stop:
             now = time.time()
-            if now > self._nextcheck:
-                self._storage.log('running %s', name)
-                self._dowork(now)
-                self._nextcheck = now + self._interval
-            # Now we sleep for a little while before we check again.  Sleep
-            # for the minimum of self._interval and SLEEP_TIME so as to be as
-            # responsive as possible to .stop() calls.
-            time.sleep(min(self._interval, SLEEP_TIME))
+            if now < self._nextcheck:
+                continue
+            self._storage.log('running %s', name)
+            self._dowork()
+            self._nextcheck = now + self._interval
+            # Now we sleep for a little while before we check again.  We use a
+            # poll timeout so that when the parent thread writes its "stop
+            # marker" to the readfd, we'll exit out immediately.
+            fds = self._poll.poll(self._interval * 1000)
+            for fd, event in self._poll.poll(self._interval):
+                # Just read and throw away the data.  The _stop flag will
+                # already have been set if we're being shutdown.
+                if event & select.POLLIN:
+                    #print name, 'data:', os.read(fd, 1024)
+                    os.read(fd, 1024)
         self._storage.log('%s thread finished', name)
 
     def stop(self):
         self._stop = True
 
-    def _dowork(self, now):
+    def _dowork(self):
         pass
 
 
 
 class _Checkpoint(_WorkThread):
-    def __init__(self, storage, interval):
-        _WorkThread.__init__(self, storage, interval, 'checkpointing')
+    def __init__(self, storage, poll, interval):
+        _WorkThread.__init__(self, storage, poll, interval, 'checkpointing')
 
-    def _dowork(self, now):
+    def _dowork(self):
         self._storage.docheckpoint()


=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.56 => 1.57 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.56	Tue Dec  3 14:12:34 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py	Fri Dec  6 00:58:38 2002
@@ -17,9 +17,7 @@
 
 __version__ = '$Revision$'.split()[-2:][0]
 
-import sys
 import time
-import threading
 import cPickle as pickle
 from struct import pack, unpack
 
@@ -238,15 +236,14 @@
         self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
         # Do recovery and consistency checks
         self._withlock(self._dorecovery)
-        # Set up the autopacking thread
+
+    def _make_autopacker(self, poll):
         config = self._config
-        if config.frequency > 0:
-            lastpacktime = U64(self._last_packtime())
-            self._autopacker = _Autopack(
-                self, config.frequency,
-                config.packtime, config.classicpack,
-                lastpacktime)
-            self._autopacker.start()
+        lastpacktime = U64(self._last_packtime())
+        return _Autopack(
+            self, poll, config.frequency,
+            config.packtime, config.classicpack,
+            lastpacktime)
 
     def _dorecovery(self):
         # If these tables are non-empty, it means we crashed during a pack
@@ -1861,16 +1858,16 @@
 
 
 class _Autopack(_WorkThread):
-    def __init__(self, storage, frequency, packtime, classicpack,
+    def __init__(self, storage, poll, frequency, packtime, classicpack,
                  lastpacktime):
-        _WorkThread.__init__(self, storage, frequency, 'autopacking')
+        _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
         self._packtime = packtime
         self._classicpack = classicpack
         # Bookkeeping
         self._stop = False
         self._lastclassic = 0
 
-    def _dowork(self, now):
+    def _dowork(self):
         # Should we do a classic pack this time?
         if self._classicpack <= 0:
             classicp = False
@@ -1879,4 +1876,4 @@
             self._lastclassic = v
             classicp = not v
         # Run the autopack phase
-        self._storage.autopack(now - self._packtime, classicp)
+        self._storage.autopack(time.time() - self._packtime, classicp)


=== ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py 1.20 => 1.21 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py:1.20	Tue Dec  3 14:10:09 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py	Fri Dec  6 00:58:38 2002
@@ -17,16 +17,13 @@
 
 __version__ = '$Revision$'[-2:][0]
 
-import time
-import threading
-
 # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
 # http://pybsddb.sourceforge.net.  It is compatible with release 3.4 of
 # PyBSDDB3.
 from bsddb3 import db
 
 from ZODB import POSException
-from ZODB.utils import U64, p64
+from ZODB.utils import p64, U64
 from ZODB.referencesf import referencesf
 from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
 
@@ -40,12 +37,6 @@
 PRESENT = 'X'
 ZERO = '\0'*8
 
-# Number of seconds for the autopack thread to sleep before checking to see if
-# it's time for another autopack run.  Lower numbers mean more processing,
-# higher numbers mean less responsiveness to shutdown requests.  10 seconds
-# seems like a good compromise.
-AUTOPACK_CHECK_SLEEP = 10
-
 try:
     True, False
 except NameError:
@@ -131,11 +122,9 @@
                     self._withtxn(self._docommit, tid)
             finally:
                 self._lock_release()
-        # Set up the autopacking thread
-        if self._config.frequency > 0:
-            config = self._config
-            self._autopacker = _Autopack(self, config.frequency)
-            self._autopacker.start()
+
+    def _make_autopacker(self, poll):
+        return _Autopack(self, poll, self._config.frequency)
 
     def _doabort(self, txn, tid):
         co = cs = None
@@ -548,9 +537,9 @@
 
 
 class _Autopack(_WorkThread):
-    def __init__(self, storage, frequency):
-        _WorkThread.__init__(self, storage, frequency, 'autopacking')
+    def __init__(self, storage, poll, frequency):
+        _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
 
-    def _dowork(self, now):
+    def _dowork(self):
         # Run the autopack phase
         self._storage.pack('ignored', referencesf)