[Zodb-checkins] CVS: ZODB4/BDBStorage - BDBFullStorage.py:2.2 BDBMinimalStorage.py:2.1 BerkeleyBase.py:2.2
Barry Warsaw
barry@wooz.org
Fri, 6 Dec 2002 15:41:30 -0500
Update of /cvs-repository/ZODB4/BDBStorage
In directory cvs.zope.org:/tmp/cvs-serv28835/BDBStorage
Modified Files:
BDBFullStorage.py BDBMinimalStorage.py BerkeleyBase.py
Log Message:
Port changes from ZODB3
Simplify again the WorkThread api. Don't use a non-portable poll
object -- because we just care about one event we can simply use a
threading.Event object. Specific changes include,
BerkeleyBase.py
__init__(): Simplify the creation of the checkpointer and
autopacker worker threads.
close(): Replace autopacker.stop() with setting the Event object.
This both kicks us out of the wait() and sets the thread's
internal stop flag, so it's all we need.
_WorkThread.init(): Take the `name' argument out of the
constructor. It was the only thing that 2/3rds of the subclasses
needed to override, so just stick it in a class attribute.
run(): Simplify to use the Event object. Also, change _nextcheck
to recalculate `now' after the work is done. There's no telling
how much time the work will take (it may not matter much in
practice).
stop(): Removed.
_Checkpoint.__init__(): Removed
BDBFullStorage.py
_make_autopacker(): Updated
_Autopack.__init__(): Updated
BDBMinimalStorage.py
_make_autopacker(): Updated
_Autopack.__init__(): Removed
=== ZODB4/BDBStorage/BDBFullStorage.py 2.1 => 2.2 ===
--- ZODB4/BDBStorage/BDBFullStorage.py:2.1 Wed Dec 4 14:54:49 2002
+++ ZODB4/BDBStorage/BDBFullStorage.py Fri Dec 6 15:41:30 2002
@@ -17,9 +17,7 @@
__version__ = '$Revision$'.split()[-2:][0]
-import sys
import time
-import threading
import cPickle as pickle
from struct import pack, unpack
@@ -225,15 +223,14 @@
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
- # Set up the autopacking thread
+
+ def _make_autopacker(self, event):
config = self._config
- if config.frequency > 0:
- lastpacktime = u64(self._last_packtime())
- self._autopacker = _Autopack(
- self, config.frequency,
- config.packtime, config.classicpack,
- lastpacktime)
- self._autopacker.start()
+ lastpacktime = u64(self._last_packtime())
+ return _Autopack(
+ self, event,
+ config.frequency, config.packtime, config.classicpack,
+ lastpacktime)
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
@@ -1798,7 +1795,10 @@
self.status = ' '
self.user = user
self.description = desc
- self._extension = ext
+ try:
+ self._extension = pickle.loads(ext)
+ except EOFError:
+ self._extension = {}
# Internal pointer
self._oids = self._storage._alltxnoids(self.tid)
# To make .pop() more efficient
@@ -1840,16 +1840,18 @@
class _Autopack(_WorkThread):
- def __init__(self, storage, frequency, packtime, classicpack,
+ NAME = 'autopacking'
+
+ def __init__(self, storage, event,
+ frequency, packtime, classicpack,
lastpacktime):
- _WorkThread.__init__(self, storage, frequency, 'autopacking')
+ _WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
- self._stop = False
self._lastclassic = 0
- def _dowork(self, now):
+ def _dowork(self):
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
@@ -1858,4 +1860,4 @@
self._lastclassic = v
classicp = not v
# Run the autopack phase
- self._storage.autopack(now - self._packtime, classicp)
+ self._storage.autopack(time.time() - self._packtime, classicp)
=== ZODB4/BDBStorage/BDBMinimalStorage.py 2.0 => 2.1 ===
--- ZODB4/BDBStorage/BDBMinimalStorage.py:2.0 Wed Dec 4 14:42:20 2002
+++ ZODB4/BDBStorage/BDBMinimalStorage.py Fri Dec 6 15:41:30 2002
@@ -17,16 +17,13 @@
__version__ = '$Revision$'[-2:][0]
-import time
-import threading
-
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3.
from bsddb3 import db
from ZODB import POSException
-from ZODB.utils import u64, p64
+from ZODB.utils import p64, u64
from ZODB.Serialize import findrefs
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
@@ -40,12 +37,6 @@
PRESENT = 'X'
ZERO = '\0'*8
-# Number of seconds for the autopack thread to sleep before checking to see if
-# it's time for another autopack run. Lower numbers mean more processing,
-# higher numbers mean less responsiveness to shutdown requests. 10 seconds
-# seems like a good compromise.
-AUTOPACK_CHECK_SLEEP = 10
-
class BDBMinimalStorage(BerkeleyBase, ConflictResolvingStorage):
@@ -125,11 +116,9 @@
self._withtxn(self._docommit, tid)
finally:
self._lock_release()
- # Set up the autopacking thread
- if self._config.frequency > 0:
- config = self._config
- self._autopacker = _Autopack(self, config.frequency)
- self._autopacker.start()
+
+ def _make_autopacker(self, event):
+ return _Autopack(self, event, self._config.frequency)
def _doabort(self, txn, tid):
co = cs = None
@@ -532,9 +521,8 @@
class _Autopack(_WorkThread):
- def __init__(self, storage, frequency):
- _WorkThread.__init__(self, storage, frequency, 'autopacking')
+ NAME = 'autopacking'
- def _dowork(self, now):
+ def _dowork(self):
# Run the autopack phase
self._storage.pack('ignored')
=== ZODB4/BDBStorage/BerkeleyBase.py 2.1 => 2.2 ===
--- ZODB4/BDBStorage/BerkeleyBase.py:2.1 Wed Dec 4 17:40:26 2002
+++ ZODB4/BDBStorage/BerkeleyBase.py Fri Dec 6 15:41:30 2002
@@ -28,7 +28,6 @@
# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
-from ZODB import POSException
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.Serialize import findrefs
@@ -36,12 +35,11 @@
GBYTES = 1024 * 1024 * 1000
-# Maximum number of seconds for background thread to sleep before checking to
-# see if it's time for another autopack run. Lower numbers mean more
-# processing, higher numbers mean less responsiveness to shutdown requests.
-# 10 seconds seems like a good compromise. Note that if the check interval is
-# less than the sleep time, the minimum will be used.
-SLEEP_TIME = 10
+# How long should we wait to join one of the background daemon threads? It's
+# a good idea to not set this too short, or we could corrupt our database.
+# That would be recoverable, but recovery could take a long time too, so it's
+# better to shutdown cleanly.
+JOIN_TIME = 10
@@ -182,7 +180,6 @@
# Instantiate a pack lock
self._packlock = threading.Lock()
- self._autopacker = None
self._stop = self._closed = False
# Initialize a few other things
self._prefix = prefix
@@ -192,11 +189,23 @@
self._setupDBs()
# Initialize the object id counter.
self._init_oid()
+ # Set up the checkpointing thread
if config.interval > 0:
- self._checkpointer = _Checkpoint(self, config.interval)
+ self._checkpointstop = event = threading.Event()
+ self._checkpointer = _Checkpoint(self, event, config.interval)
self._checkpointer.start()
else:
self._checkpointer = None
+ # Set up the autopacking thread
+ if config.frequency > 0:
+ self._autopackstop = event = threading.Event()
+ self._autopacker = self._make_autopacker(event)
+ self._autopacker.start()
+ else:
+ self._autopacker = None
+
+ def _make_autopacker(self, event):
+ raise NotImplementedError
def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
"""Open an individual database with the given flags.
@@ -314,9 +323,21 @@
tables are closed, and finally the environment is force checkpointed
and closed too.
"""
- # Set this flag before acquiring the lock so we don't block waiting
- # for the autopack thread to give up the lock.
+ # We have to shutdown the background threads before we acquire the
+ # lock, or we'll could end up closing the environment before the
+ # autopacking thread exits.
self._stop = True
+ # Stop the autopacker thread
+ if self._autopacker:
+ self.log('stopping autopacking thread')
+ # Setting the event also toggles the stop flag
+ self._autopackstop.set()
+ self._autopacker.join(JOIN_TIME)
+ if self._checkpointer:
+ self.log('stopping checkpointing thread')
+ # Setting the event also toggles the stop flag
+ self._checkpointstop.set()
+ self._checkpointer.join(JOIN_TIME)
self._lock_acquire()
try:
if not self._closed:
@@ -326,15 +347,6 @@
self._lock_release()
def _doclose(self):
- # Stop the autopacker thread
- if self._autopacker:
- self.log('stopping autopacking thread')
- self._autopacker.stop()
- self._autopacker.join(SLEEP_TIME * 2)
- if self._checkpointer:
- self.log('stopping checkpointing thread')
- self._checkpointer.stop()
- self._checkpointer.join(SLEEP_TIME * 2)
# Close all the tables
for d in self._tables:
d.close()
@@ -417,30 +429,37 @@
lock_file(lockfile)
lockfile.write(str(os.getpid()))
lockfile.flush()
- # Create, initialize, and open the environment
- env = db.DBEnv()
- if config.logdir is not None:
- env.set_lg_dir(config.logdir)
- gbytes, bytes = divmod(config.cachesize, GBYTES)
- env.set_cachesize(gbytes, bytes)
- env.open(envname,
- db.DB_CREATE # create underlying files as necessary
- | db.DB_RECOVER # run normal recovery before opening
- | db.DB_INIT_MPOOL # initialize shared memory buffer pool
- | db.DB_INIT_TXN # initialize transaction subsystem
- | db.DB_THREAD # we use the environment from other threads
- )
+ try:
+ # Create, initialize, and open the environment
+ env = db.DBEnv()
+ if config.logdir is not None:
+ env.set_lg_dir(config.logdir)
+ gbytes, bytes = divmod(config.cachesize, GBYTES)
+ env.set_cachesize(gbytes, bytes)
+ env.open(envname,
+ db.DB_CREATE # create underlying files as necessary
+ | db.DB_RECOVER # run normal recovery before opening
+ | db.DB_INIT_MPOOL # initialize shared memory buffer pool
+ | db.DB_INIT_TXN # initialize transaction subsystem
+ | db.DB_THREAD # we use the env from multiple threads
+ )
+ except:
+ lockfile.close()
+ raise
return env, lockfile
class _WorkThread(threading.Thread):
- def __init__(self, storage, checkinterval, name='work'):
+ NAME = 'worker'
+
+ def __init__(self, storage, event, checkinterval):
threading.Thread.__init__(self)
self._storage = storage
+ self._event = event
self._interval = checkinterval
- self._name = name
- # Bookkeeping
+ # Bookkeeping. _nextcheck is useful as a non-public interface aiding
+ # testing. See test_autopack.py.
self._stop = False
self._nextcheck = checkinterval
# We don't want these threads to hold up process exit. That could
@@ -448,31 +467,28 @@
self.setDaemon(True)
def run(self):
- name = self._name
+ name = self.NAME
self._storage.log('%s thread started', name)
while not self._stop:
now = time.time()
- if now > self._nextcheck:
+ if now >= self._nextcheck:
self._storage.log('running %s', name)
- self._dowork(now)
- self._nextcheck = now + self._interval
- # Now we sleep for a little while before we check again. Sleep
- # for the minimum of self._interval and SLEEP_TIME so as to be as
- # responsive as possible to .stop() calls.
- time.sleep(min(self._interval, SLEEP_TIME))
+ self._dowork()
+ # Recalculate `now' because _dowork() could have taken a
+ # while. time.time() can be expensive, but oh well.
+ self._nextcheck = time.time() + self._interval
+ # Block w/ timeout on the shutdown event.
+ self._event.wait(self._interval)
+ self._stop = self._event.isSet()
self._storage.log('%s thread finished', name)
- def stop(self):
- self._stop = True
-
- def _dowork(self, now):
+ def _dowork(self):
pass
class _Checkpoint(_WorkThread):
- def __init__(self, storage, interval):
- _WorkThread.__init__(self, storage, interval, 'checkpointing')
+ NAME = 'checkpointing'
- def _dowork(self, now):
+ def _dowork(self):
self._storage.docheckpoint()