[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - BerkeleyBase.py:1.30 Full.py:1.57 Minimal.py:1.21
Barry Warsaw
barry@wooz.org
Fri, 6 Dec 2002 00:58:38 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv28652/bsddb3Storage
Modified Files:
BerkeleyBase.py Full.py Minimal.py
Log Message:
Rework the background threads to be poll-with-timeout based instead of
sleep based. We create two ends of a pipe in the parent thread, with
the child thread polling/reading one end and the parent writing to the
other. The only thing written to this pipe is a "stop marker" -- just
a string that wakes the thread up immediately when we're closing the
storage.
The primary reason for this is to speed up shutdown so that we don't
have to wait for the next tick of the sleep counter before we'll
trigger the thread shutdown. Makes shutting down Zope using this
storage much quicker.
Specific changes include:
BerkeleyBase.py
SLEEP_TIME -> JOIN_TIME since it's now just the interval we'll
wait for the thread.join to complete.
__init__(): Create both the checkpointer thread and the autopacker
thread, set up the pipes, and get them both rolling. We refactor
creation of the autopacker instance into a separate overrideable
method since this is the one bit that's different between the two
storages.
_make_autopacker(): Intended to be overridden.
close(), _doclose(): Move the thread shutdown code out of the
lock, since we don't want to potentially deadlock the shutting
down of the background thread. This could happen if close() was
entered when the autopacker thread was between lock acquisitions
in _dopack(). Also, be sure to write to the pipe to wake the
child threads up immediately.
env_from_string(): Wrap the actual creation and opening of the
environment in a try/except, so that if there's a failure, we can
be sure to give up the file lock.
_WorkThread class: Accept a poll object which wraps the read end
of the pipe. Rework run() to do the poll-with-timeout instead of
sleep. I don't think we strictly need to do the fd read given the
simplicity (and mono-commandity) of the protocol, but it can't
hurt. The _dowork() method's signature no longer contains the
`now' variable.
Full.py
_make_autopacker(): Override base class to return storage specific
_Autopack instance.
Minimal.py
Same, but also include some code cleanup.
Also, get rid of some unnecessary imports.
=== ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py 1.29 => 1.30 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py:1.29 Tue Dec 3 14:13:42 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py Fri Dec 6 00:58:38 2002
@@ -19,6 +19,7 @@
import os
import time
import errno
+import select
import threading
from types import StringType
@@ -28,7 +29,6 @@
# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
-from ZODB import POSException
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
@@ -37,12 +37,11 @@
GBYTES = 1024 * 1024 * 1000
-# Maximum number of seconds for background thread to sleep before checking to
-# see if it's time for another autopack run. Lower numbers mean more
-# processing, higher numbers mean less responsiveness to shutdown requests.
-# 10 seconds seems like a good compromise. Note that if the check interval is
-# less than the sleep time, the minimum will be used.
-SLEEP_TIME = 10
+# How long should we wait to join one of the background daemon threads? It's
+# a good idea to not set this too short, or we could corrupt our database.
+# That would be recoverable, but recovery could take a long time too, so it's
+# better to shutdown cleanly.
+JOIN_TIME = 10
try:
True, False
@@ -189,7 +188,6 @@
# Instantiate a pack lock
self._packlock = ThreadLock.allocate_lock()
- self._autopacker = None
self._stop = self._closed = False
# Initialize a few other things
self._prefix = prefix
@@ -199,11 +197,27 @@
self._setupDBs()
# Initialize the object id counter.
self._init_oid()
+ # Set up the checkpointing thread
if config.interval > 0:
- self._checkpointer = _Checkpoint(self, config.interval)
+ r, self._checkpointfd = os.pipe()
+ poll = select.poll()
+ poll.register(r, select.POLLIN)
+ self._checkpointer = _Checkpoint(self, poll, config.interval)
self._checkpointer.start()
else:
self._checkpointer = None
+ # Set up the autopacking thread
+ if config.frequency > 0:
+ r, self._autopackfd = os.pipe()
+ poll = select.poll()
+ poll.register(r, select.POLLIN)
+ self._autopacker = self._make_autopacker(poll)
+ self._autopacker.start()
+ else:
+ self._autopacker = None
+
+ def _make_autopacker(self, poll):
+ raise NotImplementedError
def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
"""Open an individual database with the given flags.
@@ -321,9 +335,21 @@
tables are closed, and finally the environment is force checkpointed
and closed too.
"""
- # Set this flag before acquiring the lock so we don't block waiting
- # for the autopack thread to give up the lock.
+ # We have to shutdown the background threads before we acquire the
+ # lock, or we'll could end up closing the environment before the
+ # autopacking thread exits.
self._stop = True
+ # Stop the autopacker thread
+ if self._autopacker:
+ self.log('stopping autopacking thread')
+ self._autopacker.stop()
+ os.write(self._autopackfd, 'STOP')
+ self._autopacker.join(JOIN_TIME)
+ if self._checkpointer:
+ self.log('stopping checkpointing thread')
+ self._checkpointer.stop()
+ os.write(self._checkpointfd, 'STOP')
+ self._checkpointer.join(JOIN_TIME)
self._lock_acquire()
try:
if not self._closed:
@@ -333,15 +359,6 @@
self._lock_release()
def _doclose(self):
- # Stop the autopacker thread
- if self._autopacker:
- self.log('stopping autopacking thread')
- self._autopacker.stop()
- self._autopacker.join(SLEEP_TIME * 2)
- if self._checkpointer:
- self.log('stopping checkpointing thread')
- self._checkpointer.stop()
- self._checkpointer.join(SLEEP_TIME * 2)
# Close all the tables
for d in self._tables:
d.close()
@@ -426,30 +443,36 @@
lock_file(lockfile)
lockfile.write(str(os.getpid()))
lockfile.flush()
- # Create, initialize, and open the environment
- env = db.DBEnv()
- if config.logdir is not None:
- env.set_lg_dir(config.logdir)
- gbytes, bytes = divmod(config.cachesize, GBYTES)
- env.set_cachesize(gbytes, bytes)
- env.open(envname,
- db.DB_CREATE # create underlying files as necessary
- | db.DB_RECOVER # run normal recovery before opening
- | db.DB_INIT_MPOOL # initialize shared memory buffer pool
- | db.DB_INIT_TXN # initialize transaction subsystem
- | db.DB_THREAD # we use the environment from other threads
- )
+ try:
+ # Create, initialize, and open the environment
+ env = db.DBEnv()
+ if config.logdir is not None:
+ env.set_lg_dir(config.logdir)
+ gbytes, bytes = divmod(config.cachesize, GBYTES)
+ env.set_cachesize(gbytes, bytes)
+ env.open(envname,
+ db.DB_CREATE # create underlying files as necessary
+ | db.DB_RECOVER # run normal recovery before opening
+ | db.DB_INIT_MPOOL # initialize shared memory buffer pool
+ | db.DB_INIT_TXN # initialize transaction subsystem
+ | db.DB_THREAD # we use the env from multiple threads
+ )
+ except:
+ lockfile.close()
+ raise
return env, lockfile
class _WorkThread(threading.Thread):
- def __init__(self, storage, checkinterval, name='work'):
+ def __init__(self, storage, poll, checkinterval, name='work'):
threading.Thread.__init__(self)
self._storage = storage
+ self._poll = poll
self._interval = checkinterval
self._name = name
- # Bookkeeping
+ # Bookkeeping. _nextcheck is useful as a non-public interface aiding
+ # testing. See test_autopack.py.
self._stop = False
self._nextcheck = checkinterval
# We don't want these threads to hold up process exit. That could
@@ -461,27 +484,34 @@
self._storage.log('%s thread started', name)
while not self._stop:
now = time.time()
- if now > self._nextcheck:
- self._storage.log('running %s', name)
- self._dowork(now)
- self._nextcheck = now + self._interval
- # Now we sleep for a little while before we check again. Sleep
- # for the minimum of self._interval and SLEEP_TIME so as to be as
- # responsive as possible to .stop() calls.
- time.sleep(min(self._interval, SLEEP_TIME))
+ if now < self._nextcheck:
+ continue
+ self._storage.log('running %s', name)
+ self._dowork()
+ self._nextcheck = now + self._interval
+ # Now we sleep for a little while before we check again. We use a
+ # poll timeout so that when the parent thread writes its "stop
+ # marker" to the readfd, we'll exit out immediately.
+ fds = self._poll.poll(self._interval * 1000)
+ for fd, event in self._poll.poll(self._interval):
+ # Just read and throw away the data. The _stop flag will
+ # already have been set if we're being shutdown.
+ if event & select.POLLIN:
+ #print name, 'data:', os.read(fd, 1024)
+ os.read(fd, 1024)
self._storage.log('%s thread finished', name)
def stop(self):
self._stop = True
- def _dowork(self, now):
+ def _dowork(self):
pass
class _Checkpoint(_WorkThread):
- def __init__(self, storage, interval):
- _WorkThread.__init__(self, storage, interval, 'checkpointing')
+ def __init__(self, storage, poll, interval):
+ _WorkThread.__init__(self, storage, poll, interval, 'checkpointing')
- def _dowork(self, now):
+ def _dowork(self):
self._storage.docheckpoint()
=== ZODB3/bsddb3Storage/bsddb3Storage/Full.py 1.56 => 1.57 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Full.py:1.56 Tue Dec 3 14:12:34 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Full.py Fri Dec 6 00:58:38 2002
@@ -17,9 +17,7 @@
__version__ = '$Revision$'.split()[-2:][0]
-import sys
import time
-import threading
import cPickle as pickle
from struct import pack, unpack
@@ -238,15 +236,14 @@
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
- # Set up the autopacking thread
+
+ def _make_autopacker(self, poll):
config = self._config
- if config.frequency > 0:
- lastpacktime = U64(self._last_packtime())
- self._autopacker = _Autopack(
- self, config.frequency,
- config.packtime, config.classicpack,
- lastpacktime)
- self._autopacker.start()
+ lastpacktime = U64(self._last_packtime())
+ return _Autopack(
+ self, poll, config.frequency,
+ config.packtime, config.classicpack,
+ lastpacktime)
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
@@ -1861,16 +1858,16 @@
class _Autopack(_WorkThread):
- def __init__(self, storage, frequency, packtime, classicpack,
+ def __init__(self, storage, poll, frequency, packtime, classicpack,
lastpacktime):
- _WorkThread.__init__(self, storage, frequency, 'autopacking')
+ _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
self._packtime = packtime
self._classicpack = classicpack
# Bookkeeping
self._stop = False
self._lastclassic = 0
- def _dowork(self, now):
+ def _dowork(self):
# Should we do a classic pack this time?
if self._classicpack <= 0:
classicp = False
@@ -1879,4 +1876,4 @@
self._lastclassic = v
classicp = not v
# Run the autopack phase
- self._storage.autopack(now - self._packtime, classicp)
+ self._storage.autopack(time.time() - self._packtime, classicp)
=== ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py 1.20 => 1.21 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py:1.20 Tue Dec 3 14:10:09 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/Minimal.py Fri Dec 6 00:58:38 2002
@@ -17,16 +17,13 @@
__version__ = '$Revision$'[-2:][0]
-import time
-import threading
-
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
# PyBSDDB3.
from bsddb3 import db
from ZODB import POSException
-from ZODB.utils import U64, p64
+from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
@@ -40,12 +37,6 @@
PRESENT = 'X'
ZERO = '\0'*8
-# Number of seconds for the autopack thread to sleep before checking to see if
-# it's time for another autopack run. Lower numbers mean more processing,
-# higher numbers mean less responsiveness to shutdown requests. 10 seconds
-# seems like a good compromise.
-AUTOPACK_CHECK_SLEEP = 10
-
try:
True, False
except NameError:
@@ -131,11 +122,9 @@
self._withtxn(self._docommit, tid)
finally:
self._lock_release()
- # Set up the autopacking thread
- if self._config.frequency > 0:
- config = self._config
- self._autopacker = _Autopack(self, config.frequency)
- self._autopacker.start()
+
+ def _make_autopacker(self, poll):
+ return _Autopack(self, poll, self._config.frequency)
def _doabort(self, txn, tid):
co = cs = None
@@ -548,9 +537,9 @@
class _Autopack(_WorkThread):
- def __init__(self, storage, frequency):
- _WorkThread.__init__(self, storage, frequency, 'autopacking')
+ def __init__(self, storage, poll, frequency):
+ _WorkThread.__init__(self, storage, poll, frequency, 'autopacking')
- def _dowork(self, now):
+ def _dowork(self):
# Run the autopack phase
self._storage.pack('ignored', referencesf)