[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - BerkeleyBase.py:1.23
Barry Warsaw
barry@wooz.org
Tue, 19 Nov 2002 14:48:09 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv28284
Modified Files:
BerkeleyBase.py
Log Message:
Based on suggestions by Toby, we're now throwing checkpointing into a
thread, instead of doing it every nth ZODB transaction. We can
actually provide a base class for both the checkpointing and
autopacking threads here. Changes here include:
SLEEP_TIME: how often (in seconds) a background thread should wake up
to see if there's work to do.
True, False: Add these for older Pythons.
PackStop: New class that acts as an escape hatch for pack operations.
BerkeleyConfig:
- change interval from 100 to 0. Now that this controls the
checkpointing thread, the current default is to not spawn the
thread. I'm probably going to change this once I figure out
what a good value is.
BerkeleyBase:
- __init__(): Start the checkpointing thread if interval > 0
- _setupDB(): Add some additional keyword args so that the QUEUE
style tables can use this convenience method too.
- close(): Be sure to stop and join the checkpointing thread
- _docheckpoint(): Removed
- _withtxn(): Catch PackStop escape hatch exceptions. This one
aborts the current Berkeley transaction but eats the exception.
Also, don't call _docheckpoint() here.
- docheckpoint(): New method which the checkpointing threads can
call.
env_from_string(): use DB_RECOVER_FATAL for autorecovery on open.
_WorkThread: New base class for the checkpointing and autopacking
threads.
_CheckPoint: The common checkpointing thread class.
=== ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py 1.22 => 1.23 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py:1.22 Mon Nov 11 15:56:30 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py Tue Nov 19 14:48:08 2002
@@ -14,9 +14,12 @@
"""Base class for BerkeleyStorage implementations.
"""
+__version__ = '$Revision$'.split()[-2:][0]
import os
+import time
import errno
+import threading
from types import StringType
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
@@ -30,10 +33,27 @@
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
import ThreadLock
+import zLOG
GBYTES = 1024 * 1024 * 1000
-__version__ = '$Revision$'.split()[-2:][0]
+# Maximum number of seconds for background thread to sleep before checking to
+# see if it's time for another autopack run. Lower numbers mean more
+# processing, higher numbers mean less responsiveness to shutdown requests.
+# 10 seconds seems like a good compromise. Note that if the check interval is
+# less than the sleep time, the minimum will be used.
+SLEEP_TIME = 10
+
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+
+
+class PackStop(Exception):
+ """Escape hatch for pack operations."""
@@ -53,9 +73,10 @@
The following checkpointing attributes are supported:
- - interval indicates the approximate number of Berkeley transaction
- commits and aborts after which a checkpoint is performed. Berkeley
- transactions are performed after ZODB aborts, commits, and stores.
+ - interval indicates how often, in seconds, a Berkeley checkpoint is
+ performed. If this is non-zero, checkpointing is performed by a
+ background thread. Otherwise checkpointing will only be done when the
+ storage is closed. You really want to enable checkpointing. ;)
- kbytes is passed directly to txn_checkpoint()
@@ -98,7 +119,7 @@
never automatically do classic packs. For Minimal storage, this value
is ignored -- all packs are classic packs.
"""
- interval = 100
+ interval = 0
kbyte = 0
min = 0
logdir = None
@@ -142,12 +163,10 @@
Optional config must be a BerkeleyConfig instance, or None, which
means to use the default configuration options.
"""
-
# sanity check arguments
if config is None:
config = BerkeleyConfig()
self._config = config
- self._config._counter = 0
if name == '':
raise TypeError, 'database name is empty'
@@ -167,24 +186,36 @@
# Instantiate a pack lock
self._packlock = ThreadLock.allocate_lock()
self._autopacker = None
+ self._stop = False
# Initialize a few other things
self._prefix = prefix
# Give the subclasses a chance to interpose into the database setup
# procedure
+ self._tables = []
self._setupDBs()
# Initialize the object id counter.
self._init_oid()
+ if config.interval > 0:
+ self._checkpointer = _Checkpoint(self, config.interval)
+ self._checkpointer.start()
+ else:
+ self._checkpointer = None
- def _setupDB(self, name, flags=0):
+ def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
"""Open an individual database with the given flags.
flags are passed directly to the underlying DB.set_flags() call.
+ Optional dbtype specifies the type of BerkeleyDB access method to
+ use. Optional reclen if not None gives the record length.
"""
d = db.DB(self._env)
if flags:
d.set_flags(flags)
# Our storage is based on the underlying BSDDB btree database type.
- d.open(self._prefix + name, db.DB_BTREE, db.DB_CREATE)
+ if reclen is not None:
+ d.set_re_len(reclen)
+ d.open(self._prefix + name, dbtype, db.DB_CREATE)
+ self._tables.append(d)
return d
def _setupDBs(self):
@@ -270,6 +301,14 @@
"""Close the storage by closing the databases it uses and by closing
its environment.
"""
+ # Close all the tables
+ if self._checkpointer:
+ zLOG.LOG('Full storage', zLOG.INFO,
+ 'stopping checkpointing thread')
+ self._checkpointer.stop()
+ self._checkpointer.join(SLEEP_TIME * 2)
+ for d in self._tables:
+ d.close()
# As recommended by Keith Bostic @ Sleepycat, we need to do
# two checkpoints just before we close the environment.
# Otherwise, auto-recovery on environment opens can be
@@ -285,15 +324,6 @@
self._env.close()
os.unlink(lockfile)
- def _docheckpoint(self):
- # Periodically checkpoint the database. This is called approximately
- # once per Berkeley transaction commit or abort.
- config = self._config
- config._counter += 1
- if config._counter > config.interval:
- self._env.txn_checkpoint(config.kbyte, config.min)
- config._counter = 0
-
def _update(self, deltas, data, incdec):
refdoids = []
referencesf(data, refdoids)
@@ -316,16 +346,27 @@
txn = self._env.txn_begin()
try:
ret = meth(txn, *args, **kws)
+ except PackStop:
+ # Escape hatch for shutdown during pack. Like the bare except --
+ # i.e. abort the transaction -- but swallow the exception.
+ txn.abort()
except:
#import traceback ; traceback.print_exc()
txn.abort()
- self._docheckpoint()
raise
else:
txn.commit()
- self._docheckpoint()
return ret
+ def docheckpoint(self):
+ config = self._config
+ self._lock_acquire()
+ try:
+ if not self._stop:
+ self._env.txn_checkpoint(config.kbyte, config.min)
+ finally:
+ self._lock_release()
+
def env_from_string(envname, config):
@@ -356,10 +397,55 @@
gbytes, bytes = divmod(config.cachesize, GBYTES)
env.set_cachesize(gbytes, bytes)
env.open(envname,
- db.DB_CREATE # create underlying files as necessary
- | db.DB_RECOVER # run normal recovery before opening
- | db.DB_INIT_MPOOL # initialize shared memory buffer pool
- | db.DB_INIT_TXN # initialize transaction subsystem
- | db.DB_THREAD # we use the environment from other threads
+ db.DB_CREATE # create underlying files as necessary
+ | db.DB_RECOVER_FATAL # run normal recovery before opening
+ | db.DB_INIT_MPOOL # initialize shared memory buffer pool
+ | db.DB_INIT_TXN # initialize transaction subsystem
+ | db.DB_THREAD # we use the environment from other threads
)
return env, lockfile
+
+
+
+class _WorkThread(threading.Thread):
+ def __init__(self, storage, checkinterval, name='work'):
+ threading.Thread.__init__(self)
+ self._storage = storage
+ self._interval = checkinterval
+ self._name = name
+ # Bookkeeping
+ self._stop = False
+ self._nextcheck = checkinterval
+ # We don't want these threads to hold up process exit. That could
+ # lead to corrupt databases, but recovery should ultimately save us.
+ self.setDaemon(True)
+
+ def run(self):
+ name = self._name
+ zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread started' % name)
+ while not self._stop:
+ now = time.time()
+ if now > self._nextcheck:
+ zLOG.LOG('Berkeley storage', zLOG.INFO, 'running %s' % name)
+ self._dowork(now)
+ self._nextcheck = now + self._interval
+ # Now we sleep for a little while before we check again. Sleep
+ # for the minimum of self._interval and SLEEP_TIME so as to be as
+ # responsive as possible to .stop() calls.
+ time.sleep(min(self._interval, SLEEP_TIME))
+ zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread finished' % name)
+
+ def stop(self):
+ self._stop = True
+
+ def _dowork(self):
+ pass
+
+
+
+class _Checkpoint(_WorkThread):
+ def __init__(self, storage, interval):
+ _WorkThread.__init__(self, storage, interval, 'checkpointing')
+
+ def _dowork(self, now):
+ self._storage.docheckpoint()