[Zope-Checkins] SVN: Zope/trunk/lib/python/Products/Transience/
Merge transience changes from chrism-pre27-branch.
Chris McDonough
chrism at plope.com
Fri Sep 17 23:27:58 EDT 2004
Log message for revision 27630:
Merge transience changes from chrism-pre27-branch.
Changed:
U Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx
A Zope/trunk/lib/python/Products/Transience/TransactionHelper.py
U Zope/trunk/lib/python/Products/Transience/Transience.py
U Zope/trunk/lib/python/Products/Transience/TransientObject.py
U Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml
A Zope/trunk/lib/python/Products/Transience/tests/testCounters.py
A Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py
U Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py
-=-
Modified: Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx
===================================================================
--- Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx 2004-09-18 03:27:57 UTC (rev 27630)
@@ -42,6 +42,7 @@
inside of the "_data" structure. There is a concept of a
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
+ A current bucket must always exist (this is an invariant).
- A "max_timeslice" integer, which is equal to the "largest"
timeslice for which there exists a bucket in the _data structure.
@@ -74,10 +75,13 @@
Replentishing
The TOC performs "finalization", "garbage collection", and "bucket
- replentishing". It performs these tasks "in-band". This means that
- the TOC does not maintain a separate thread that wakes up every so
- often to do these housekeeping tasks. Instead, during the course of
- normal operations, the TOC opportunistically performs them.
+ replentishing". It typically performs these tasks "in-band"
+ (although it is possible to do the housekeeping tasks "out of band"
+ as well: see the methods of the Transient Object Container with
+ "housekeep" in their names). "In band" housekeeping implies that
+ the TOC does not maintain a separate thread or process that wakes up
+ every so often to clean up. Instead, during the course of normal
+ operations, the TOC opportunistically performs housekeeping functions.
Finalization is defined as optionally calling a function at bucket
expiration time against all transient objects contained within that
Added: Zope/trunk/lib/python/Products/Transience/TransactionHelper.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/TransactionHelper.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/TransactionHelper.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,36 @@
+import time
+
+class PreventTransactionCommit(Exception):
+ def __init__(self, reason):
+ self. reason = reason
+
+ def __str__(self):
+ return "Uncommittable transaction: " % self.reason
+
+class UncommittableJar:
+ """ A jar that cannot be committed """
+ def __init__(self, reason):
+ self.reason = reason
+ self.time = time.time()
+
+ def sort_key(self):
+ return self.time()
+
+ def tpc_begin(self, *arg, **kw):
+ pass
+
+ def commit(self, obj, transaction):
+ pass
+
+ def tpc_vote(self, transaction):
+ raise PreventTransactionCommit(self.reason)
+
+class makeTransactionUncommittable:
+ """
+ - register an uncommittable object with the provided transaction
+ which prevents the commit of that transaction
+ """
+ def __init__(self, transaction, reason):
+ self._p_jar = UncommittableJar(reason)
+ transaction.register(self)
+
Modified: Zope/trunk/lib/python/Products/Transience/Transience.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/Transience.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/Transience.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -30,13 +30,13 @@
TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
StringKeyedHomogeneousItemContainer, TransientItemContainer
-from BTrees.Length import Length
+from BTrees.Length import Length as BTreesLength
from BTrees.OOBTree import OOBTree
from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
from Persistence import Persistent
from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager, \
setSecurityManager
@@ -129,6 +129,7 @@
_limit = 0
_data = None
+ _inband_housekeeping = True
security.setDefaultAccess('deny')
@@ -206,7 +207,7 @@
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
- # _gc() and _replentish()).
+ # _replentish() and _gc()).
self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which
@@ -232,6 +233,10 @@
# each expired item.
self._last_finalized_timeslice = Increaser(-self._period)
+ # '_last_gc_timeslice' is a value that indicates in which
+ # timeslice the garbage collection process was last run.
+ self._last_gc_timeslice = Increaser(-self._period)
+
# our "_length" is the number of "active" data objects in _data.
# it does not include items that are still kept in _data but need to
# be garbage collected.
@@ -239,8 +244,10 @@
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive, and it
# doesn't really tell us which ones are "active" anyway.
- try: self._length.set(0)
- except AttributeError: self._length = self.getLen = Length()
+ try:
+ self._length.set(0)
+ except AttributeError:
+ self._length = self.getLen = Length2()
def _getCurrentSlices(self, now):
if self._timeout_slices:
@@ -269,16 +276,15 @@
bucket = self._data.get(0)
return bucket.get(k, default)
- # always call finalize
- self._finalize(current_ts)
+ if self._inband_housekeeping:
+ self._housekeep(current_ts)
- # call gc and/or replentish on an only-as needed basis
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
+ else:
+ # dont allow the TOC to stop working in an emergency bucket
+ # shortage
+ if self._in_emergency_bucket_shortage(current_ts):
+ self._replentish(current_ts)
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
-
# SUBTLETY ALERTY TO SELF: do not "improve" the code below
# unnecessarily, as it will end only in tears. The lack of aliases
# and the ordering is intentional.
@@ -288,7 +294,8 @@
found_ts = None
for ts in current_slices:
- abucket = self._data.get(ts, None)
+ abucket = self._data.get(ts, None) # XXX ReadConflictError hotspot
+
if abucket is None:
DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
continue
@@ -348,14 +355,13 @@
else:
current_ts = 0
- self._finalize(current_ts)
+ if self._inband_housekeeping:
+ self._housekeep(current_ts)
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
+ elif self._in_emergency_bucket_shortage(current_ts):
+ # if our scheduler fails, dont allow the TOC to stop working
+ self._replentish(current_ts, force=True)
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
-
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
@@ -374,8 +380,8 @@
def keys(self):
return self._all().keys()
- def rawkeys(self, current_ts):
- # for debugging
+ def raw(self, current_ts):
+ # for debugging and unit testing
current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer
@@ -425,15 +431,20 @@
STRICT and _assert(self._data.has_key(current_ts))
if item is _marker:
# the key didnt already exist, this is a new item
- if self._limit and len(self) >= self._limit:
+
+ length = self._length() # XXX ReadConflictError hotspot
+
+ if self._limit and length >= self._limit:
LOG('Transience', WARNING,
('Transient object container %s max subobjects '
'reached' % self.getId())
)
raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" %
- (len(self), self._limit))
- self._length.change(1)
+ (length, self._limit))
+
+ self._length.increment(1)
+
DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' %
(k, current_ts))
current_bucket = self._data[current_ts]
@@ -460,7 +471,11 @@
if not issubclass(BUCKET_CLASS, Persistent):
# tickle persistence machinery
self._data[current_ts] = bucket
- self._length.change(-1)
+
+ # XXX does increment(-1) make any sense here?
+ # rationale from dunny: we are removing an item rather than simply
+ # declaring it to be unused?
+ self._length.increment(-1)
return current_ts, item
def __len__(self):
@@ -496,79 +511,46 @@
DEBUG and TLOG('has_key: returning false from for %s' % k)
return False
- def _roll(self, now, reason):
- """
- Roll the dice to see if we're the lucky thread that does
- bucket replentishment or gc. This method is guaranteed to return
- true at some point as the difference between high and low naturally
- diminishes to zero.
+ def _get_max_expired_ts(self, now):
+ return now - (self._period * (self._timeout_slices + 1))
- The reason we do the 'random' dance in the last part of this
- is to minimize the chance that two threads will attempt to
- do housekeeping at the same time (causing conflicts).
- """
+ def _in_emergency_bucket_shortage(self, now):
+ max_ts = self._max_timeslice()
low = now/self._period
- high = self._max_timeslice()/self._period
- if high <= low:
- # we really need to win this roll because we have no
- # spare buckets (and no valid values to provide to randrange), so
- # we rig the toss.
- DEBUG and TLOG('_roll: %s rigged toss' % reason)
- return True
- else:
- # we're not in an emergency bucket shortage, so we can
- # take our chances during the roll. It's unlikely that
- # two threads will win the roll simultaneously, so we
- # avoid a certain class of conflicts here.
- if random.randrange(low, high) == low: # WINNAH!
- DEBUG and TLOG("_roll: %s roll winner" % reason)
- return True
- DEBUG and TLOG("_roll: %s roll loser" % reason)
- return False
+ high = max_ts/self._period
+ required = high <= low
+ return required
- def _get_max_expired_ts(self, now):
- return now - (self._period * (self._timeout_slices + 1))
-
def _finalize(self, now):
+ """ Call finalization handlers for the data in each stale bucket """
if not self._timeout_slices:
DEBUG and TLOG('_finalize: doing nothing (no timeout)')
return # don't do any finalization if there is no timeout
# The nature of sessioning is that when the timeslice rolls
# over, all active threads will try to do a lot of work during
- # finalization, all but one unnecessarily. We really don't
- # want more than one thread at a time to try to finalize
- # buckets at the same time so we try to lock. We give up if we
- # can't lock immediately because it doesn't matter if we skip
- # a couple of opportunities for finalization, as long as it
- # gets done by some thread eventually. A similar pattern
- # exists for _gc and _replentish.
+ # finalization if inband housekeeping is enabled, all but one
+ # unnecessarily. We really don't want more than one thread at
+ # a time to try to finalize buckets at the same time so we try
+ # to lock. We give up if we can't lock immediately because it
+ # doesn't matter if we skip a couple of opportunities for
+ # finalization, as long as it gets done by some thread
+ # eventually. A similar pattern exists for _gc and
+ # _replentish.
if not self.finalize_lock.acquire(0):
- DEBUG and TLOG('_finalize: couldnt acquire lock')
+ DEBUG and TLOG('_finalize: could not acquire lock, returning')
return
try:
DEBUG and TLOG('_finalize: lock acquired successfully')
+ last_finalized = self._last_finalized_timeslice()
- if now is None:
- now = getCurrentTimeslice(self._period) # for unit tests
-
# we want to start finalizing from one timeslice after the
- # timeslice which we last finalized. Note that finalizing
- # an already-finalized bucket somehow sends persistence
- # into a spin with an exception later raised:
- # "SystemError: error return without exception set",
- # typically coming from
- # Products.Sessions.SessionDataManager, line 182, in
- # _getSessionDataObject (if getattr(ob, '__of__', None)
- # and getattr(ob, 'aq_parent', None)). According to this
- # email message from Jim, it may be because the ob is
- # ghosted and doesn't have a _p_jar somehow:
- #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
+ # timeslice which we last finalized.
+
+ start_finalize = last_finalized + self._period
- start_finalize = self._last_finalized_timeslice() + self._period
-
# we want to finalize only up to the maximum expired timeslice
max_ts = self._get_max_expired_ts(now)
@@ -577,124 +559,221 @@
'_finalize: start_finalize (%s) >= max_ts (%s), '
'doing nothing' % (start_finalize, max_ts))
return
-
- DEBUG and TLOG('_finalize: now is %s' % now)
- DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
- DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
+ else:
+ DEBUG and TLOG(
+ '_finalize: start_finalize (%s) <= max_ts (%s), '
+ 'finalization possible' % (start_finalize, max_ts))
+ # we don't try to avoid conflicts here by doing a "random"
+ # dance (ala _replentish and _gc) because it's important that
+ # buckets are finalized as soon as possible after they've
+ # expired in order to call the delete notifier "on time".
+ self._do_finalize_work(now, max_ts, start_finalize)
- to_finalize = list(self._data.keys(start_finalize, max_ts))
- DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
+ finally:
+ self.finalize_lock.release()
- delta = 0
+ def _do_finalize_work(self, now, max_ts, start_finalize):
+ # this is only separated from _finalize for readability; it
+ # should generally not be called by anything but _finalize
+ DEBUG and TLOG('_do_finalize_work: entering')
+ DEBUG and TLOG('_do_finalize_work: now is %s' % now)
+ DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
+ DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
+ start_finalize)
- for key in to_finalize:
+ to_finalize = list(self._data.keys(start_finalize, max_ts))
+ DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
- assert(start_finalize <= key <= max_ts)
- STRICT and _assert(self._data.has_key(key))
- values = list(self._data[key].values())
- DEBUG and TLOG('_finalize: values to notify from ts %s '
- 'are %s' % (key, `list(values)`))
+ delta = 0
- delta += len(values)
+ for key in to_finalize:
- for v in values:
- self.notifyDel(v)
+ _assert(start_finalize <= key)
+ _assert(key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ values = list(self._data[key].values())
+ DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
+ 'are %s' % (key, `list(values)`))
- if delta:
- self._length.change(-delta)
+ delta += len(values)
- DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
- 'to max_ts of %s' % max_ts)
+ for v in values:
+ self.notifyDel(v)
- self._last_finalized_timeslice.set(max_ts)
+ if delta:
+ self._length.decrement(delta)
- finally:
- self.finalize_lock.release()
+ DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
+ 'to max_ts of %s' % max_ts)
+ self._last_finalized_timeslice.set(max_ts)
+
+ def _invoke_finalize_and_gc(self):
+ # for unit testing purposes only!
+ last_finalized = self._last_finalized_timeslice()
+ now = getCurrentTimeslice(self._period) # for unit tests
+ start_finalize = last_finalized + self._period
+ max_ts = self._get_max_expired_ts(now)
+ self._do_finalize_work(now, max_ts, start_finalize)
+ self._do_gc_work(now)
+
def _replentish(self, now):
- # available_spares == the number of "spare" buckets that exist in
- # "_data"
+ """ Add 'fresh' future or current buckets """
if not self._timeout_slices:
- return # do nothing if no timeout
+ DEBUG and TLOG('_replentish: no timeout, doing nothing')
+ return
- if not self.replentish_lock.acquire(0):
- DEBUG and TLOG('_replentish: couldnt acquire lock')
- return
+ # the difference between high and low naturally diminishes to
+ # zero as now approaches self._max_timeslice() during normal
+ # operations. If high <= low, it means we have no current bucket,
+ # so we *really* need to replentish (having a current bucket is
+ # an invariant for continued operation).
+ required = self._in_emergency_bucket_shortage(now)
+ lock_acquired = self.replentish_lock.acquire(0)
+
try:
- max_ts = self._max_timeslice()
- available_spares = (max_ts-now) / self._period
- DEBUG and TLOG('_replentish: now = %s' % now)
- DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
- DEBUG and TLOG('_replentish: available_spares = %s'
- % available_spares)
+ if required:
+ # we're in an emergency bucket shortage, we need to
+ # replentish regardless of whether we got the lock or
+ # not. (if we didn't get the lock, this transaction
+ # will likely result in a conflict error, that's ok)
+ if lock_acquired:
+ DEBUG and TLOG('_replentish: required, lock acquired)')
+ else:
+ DEBUG and TLOG('_replentish: required, lock NOT acquired)')
+ max_ts = self._max_timeslice()
+ self._do_replentish_work(now, max_ts)
- if available_spares >= SPARE_BUCKETS:
- DEBUG and TLOG('_replentish: available_spares (%s) >= '
- 'SPARE_BUCKETS (%s), doing '
- 'nothing'% (available_spares,
- SPARE_BUCKETS))
- return
+ elif lock_acquired:
+ # If replentish is optional, minimize the chance that
+ # two threads will attempt to do replentish work at
+ # the same time (which causes conflicts) by
+ # introducing a random element.
+ DEBUG and TLOG('_replentish: attempting optional replentish '
+ '(lock acquired)')
+ max_ts = self._max_timeslice()
+ low = now/self._period
+ high = max_ts/self._period
+ if roll(low, high, 'optional replentish'):
+ self._do_replentish_work(now, max_ts)
- if max_ts < now:
- replentish_start = now
- replentish_end = now + (self._period * SPARE_BUCKETS)
-
else:
- replentish_start = max_ts + self._period
- replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+ # This is an optional replentish and we can't acquire
+ # the lock, bail.
+ DEBUG and TLOG('_optional replentish attempt aborted, could '
+ 'not acquire lock.')
+ return
- DEBUG and TLOG('_replentish: replentish_start = %s' %
- replentish_start)
- DEBUG and TLOG('_replentish: replentish_end = %s'
- % replentish_end)
- # n is the number of buckets to create
- n = (replentish_end - replentish_start) / self._period
- new_buckets = getTimeslices(replentish_start, n, self._period)
- new_buckets.reverse()
- STRICT and _assert(new_buckets)
- DEBUG and TLOG('_replentish: adding %s new buckets' % n)
- DEBUG and TLOG('_replentish: buckets to add = %s'
- % new_buckets)
- for k in new_buckets:
- STRICT and _assert(not self._data.has_key(k))
- try:
- self._data[k] = BUCKET_CLASS()
- except ConflictError:
- DEBUG and TLOG('_replentish: conflict when adding %s' % k)
- time.sleep(random.uniform(0, 1)) # add entropy
- raise
- self._max_timeslice.set(max(new_buckets))
finally:
- self.replentish_lock.release()
+ if lock_acquired:
+ self.replentish_lock.release()
+ def _do_replentish_work(self, now, max_ts):
+ DEBUG and TLOG('_do_replentish_work: entering')
+ # this is only separated from _replentish for readability; it
+ # should generally not be called by anything but _replentish
+
+ # available_spares == the number of "spare" buckets that exist
+ # in "_data"
+ available_spares = (max_ts - now) / self._period
+ DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+ DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+ DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+ % available_spares)
+
+ if available_spares >= SPARE_BUCKETS:
+ DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+ 'SPARE_BUCKETS (%s), doing '
+ 'nothing'% (available_spares,
+ SPARE_BUCKETS))
+ return
+
+ if max_ts < now:
+ # the newest bucket in self._data is older than now!
+ replentish_start = now
+ replentish_end = now + (self._period * SPARE_BUCKETS)
+
+ else:
+ replentish_start = max_ts + self._period
+ replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
+
+ DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+ replentish_start)
+ DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+ % replentish_end)
+ # n is the number of buckets to create
+ n = (replentish_end - replentish_start) / self._period
+ new_buckets = getTimeslices(replentish_start, n, self._period)
+ new_buckets.reverse()
+ STRICT and _assert(new_buckets)
+ DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+ DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+ % new_buckets)
+ for k in new_buckets:
+ STRICT and _assert(not self._data.has_key(k))
+ self._data[k] = BUCKET_CLASS() # XXX ReadConflictError hotspot
+
+ self._max_timeslice.set(max(new_buckets))
+
def _gc(self, now=None):
+ """ Remove stale buckets """
if not self._timeout_slices:
return # dont do gc if there is no timeout
+ # give callers a good chance to do nothing (gc isn't as important
+ # as replentishment or finalization)
+ if not roll(0, 5, 'gc'):
+ DEBUG and TLOG('_gc: lost roll, doing nothing')
+ return
+
if not self.gc_lock.acquire(0):
DEBUG and TLOG('_gc: couldnt acquire lock')
return
- try:
+ try:
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
- # we want to garbage collect all buckets that have already been run
- # through finalization
- max_ts = self._last_finalized_timeslice()
+ last_gc = self._last_gc_timeslice()
+ gc_every = self._period * round(SPARE_BUCKETS / 2.0)
- DEBUG and TLOG('_gc: now is %s' % now)
- DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
+ if (now - last_gc) < gc_every:
+ DEBUG and TLOG('_gc: gc attempt not yet required '
+ '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+ return
+ else:
+ DEBUG and TLOG(
+ '_gc: (%s -%s) > %s, gc invoked' % (now, last_gc,
+ gc_every))
+ self._do_gc_work(now)
- for key in list(self._data.keys(None, max_ts)):
- assert(key <= max_ts)
- STRICT and _assert(self._data.has_key(key))
- DEBUG and TLOG('deleting %s from _data' % key)
- del self._data[key]
finally:
self.gc_lock.release()
+ def _do_gc_work(self, now):
+ # this is only separated from _gc for readability; it should
+ # generally not be called by anything but _gc
+
+ # we garbage collect any buckets that have already been run
+ # through finalization
+ DEBUG and TLOG('_do_gc_work: entering')
+
+ max_ts = self._last_finalized_timeslice()
+
+ DEBUG and TLOG('_do_gc_work: max_ts is %s' % max_ts)
+ to_gc = list(self._data.keys(None, max_ts))
+ DEBUG and TLOG('_do_gc_work: to_gc is: %s' % str(to_gc))
+
+ for key in to_gc:
+ _assert(key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ DEBUG and TLOG('_do_gc_work: deleting %s from _data' % key)
+ del self._data[key]
+
+ DEBUG and TLOG('_do_gc_work: setting last_gc_timeslice to %s' % now)
+ self._last_gc_timeslice.set(now)
+
def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item)
callback = self._getCallback(self._addCallback)
@@ -830,13 +909,37 @@
def setDelNotificationTarget(self, f):
self._delCallback = f
- security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
- def nudge(self):
- """ Used by mgmt interface to maybe do housekeeping each time
- a screen is shown """
- # run garbage collector so view is correct
- self._gc()
+ security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
+ def disableInbandHousekeeping(self):
+ """ No longer perform inband housekeeping """
+ self._inband_housekeeping = False
+ security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
+ def enableInbandHousekeeping(self):
+ """ (Re)enable inband housekeeping """
+ self._inband_housekeeping = True
+
+ security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
+ def isInbandHousekeepingEnabled(self):
+ """ Report if inband housekeeping is enabled """
+ return self._inband_housekeeping
+
+ security.declareProtected('View', 'housekeep')
+ def housekeep(self):
+ """ Call this from a scheduler at least every
+ self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
+ housekeeping """
+ # we can protect this method from being called too often by
+ # anonymous users as necessary in the future; we already have a lot
+ # of protection as-is though so no need to make it more complicated
+ # than necessary at the moment
+ self._housekeep(getCurrentTimeslice(self._period))
+
+ def _housekeep(self, now):
+ self._finalize(now)
+ self._replentish(now)
+ self._gc(now)
+
security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer')
def manage_changeTransientObjectContainer(
@@ -868,9 +971,17 @@
# f/w compat: 2.8 cannot use __len__ as an instance variable
if not state.has_key('_length'):
- length = state.get('__len__', Length())
+ length = state.get('__len__', Length2())
self._length = self.getLen = length
+ oldlength = state['_length']
+ if isinstance(oldlength, BTreesLength):
+ # TOCS prior to 2.7.3 had a BTrees.Length.Length object as
+ # the TOC length object, replace it with our own Length2
+ # that does our conflict resolution correctly:
+ sz = oldlength()
+ self._length = self.getLen = Length2(sz)
+
# TOCs prior to 2.7.1 took their period from a global
if not state.has_key('_period'):
self._period = 20 # this was the default for all prior releases
@@ -891,6 +1002,10 @@
if not state.has_key('_last_finalized_timeslice'):
self._last_finalized_timeslice = Increaser(-self._period)
+ # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+ if not state.has_key('_last_gc_timeslice'):
+ self._last_gc_timeslice = Increaser(-self._period)
+
# we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -919,6 +1034,22 @@
l.insert(0, begin + (x * period))
return l
+def roll(low, high, reason):
+ try:
+ result = random.randrange(low, high)
+ except ValueError:
+ # empty range, must win this roll
+ result = low
+
+ if result == low:
+ DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
+ (low, high, result, reason))
+ return True
+ else:
+ DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
+ (low, high, result, reason))
+ return False
+
def _assert(case):
if not case:
raise AssertionError
@@ -926,8 +1057,8 @@
class Increaser(Persistent):
"""
A persistent object representing a typically increasing integer that
- has conflict resolution uses the greatest integer out of the three
- available states
+ has conflict resolution which uses the greatest integer out of the three
+ available states.
"""
def __init__(self, v):
self.value = v
@@ -947,7 +1078,51 @@
def _p_resolveConflict(self, old, state1, state2):
return max(old, state1, state2)
- def _p_independent(self):
- return 1
+class Length2(Persistent):
+ """
+ A persistent object responsible for maintaining a repesention of
+ the number of current transient objects.
+
+ Conflict resolution is sensitive to which methods are used to
+ change the length.
+ """
+ def __init__(self, value=0):
+ self.set(value)
+
+ def set(self, value):
+ self.value = value
+ self.floor = 0
+ self.ceiling = value
+
+ def increment(self, delta):
+ """Increase the length by delta.
+
+ Conflict resolution will take the sum of all the increments."""
+ self.ceiling += delta
+ self.value += delta
+
+ def decrement(self, delta):
+ """Decrease the length by delta.
+
+ Conflict resolution will take the highest decrement."""
+ self.floor += delta
+ self.value -= delta
+
+ def __getstate__(self):
+ return self.__dict__
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __call__(self):
+ return self.value
+
+ def _p_resolveConflict(self, old, saved, new):
+ new['ceiling'] = saved['ceiling'] + new['ceiling'] - old['ceiling']
+ new['floor'] = max(old['floor'], saved['floor'], new['floor'])
+ new['value'] = new['ceiling'] - new['floor']
+ return new
+
Globals.InitializeClass(TransientObjectContainer)
+
Modified: Zope/trunk/lib/python/Products/Transience/TransientObject.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/TransientObject.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/TransientObject.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -16,6 +16,8 @@
$Id$
"""
+__version__='$Revision: 1.9.68.5 $'[11:-2]
+
from Persistence import Persistent
from Acquisition import Implicit
import time, random, sys, os
@@ -192,70 +194,60 @@
# Other non interface code
#
- def _p_independent(self):
- # My state doesn't depend on or materially effect the state of
- # other objects (eliminates read conflicts).
- return 1
-
def _p_resolveConflict(self, saved, state1, state2):
DEBUG and TLOG('entering TO _p_rc')
DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % (
saved, state1, state2))
- try:
- states = [saved, state1, state2]
+ states = [saved, state1, state2]
- # We can clearly resolve the conflict if one state is invalid,
- # because it's a terminal state.
- for state in states:
- if state.has_key('_invalid'):
- DEBUG and TLOG('TO _p_rc: a state was invalid')
- return state
- # The only other times we can clearly resolve the conflict is if
- # the token, the id, or the creation time don't differ between
- # the three states, so we check that here. If any differ, we punt
- # by raising ConflictError.
- attrs = ['token', 'id', '_created']
- for attr in attrs:
- svattr = saved.get(attr)
- s1attr = state1.get(attr)
- s2attr = state2.get(attr)
- DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
- (attr, svattr, s1attr, s2attr))
- if not svattr==s1attr==s2attr:
- DEBUG and TLOG('TO _p_rc: cant resolve conflict')
- raise ConflictError
+ # We can clearly resolve the conflict if one state is invalid,
+ # because it's a terminal state.
+ for state in states:
+ if state.has_key('_invalid'):
+ DEBUG and TLOG('TO _p_rc: a state was invalid')
+ return state
- # Now we need to do real work.
- #
- # Data in our _container dictionaries might conflict. To make
- # things simple, we intentionally create a race condition where the
- # state which was last modified "wins". It would be preferable to
- # somehow merge our _containers together, but as there's no
- # generally acceptable way to union their states, there's not much
- # we can do about it if we want to be able to resolve this kind of
- # conflict.
+ # The only other times we can clearly resolve the conflict is if
+ # the token, the id, or the creation time don't differ between
+ # the three states, so we check that here. If any differ, we punt
+ # by raising ConflictError.
+ attrs = ['token', 'id', '_created']
+ for attr in attrs:
+ svattr = saved.get(attr)
+ s1attr = state1.get(attr)
+ s2attr = state2.get(attr)
+ DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
+ (attr, svattr, s1attr, s2attr))
+ if not svattr==s1attr==s2attr:
+ DEBUG and TLOG('TO _p_rc: cant resolve conflict')
+ raise ConflictError
- # We return the state which was most recently modified, if
- # possible.
- states.sort(lastmodified_sort)
- if states[0].get('_last_modified'):
- DEBUG and TLOG('TO _p_rc: returning last mod state')
- return states[0]
+ # Now we need to do real work.
+ #
+ # Data in our _container dictionaries might conflict. To make
+ # things simple, we intentionally create a race condition where the
+ # state which was last modified "wins". It would be preferable to
+ # somehow merge our _containers together, but as there's no
+ # generally acceptable way to union their states, there's not much
+ # we can do about it if we want to be able to resolve this kind of
+ # conflict.
- # If we can't determine which object to return on the basis
- # of last modification time (no state has been modified), we return
- # the object that was most recently accessed (last pulled out of
- # our parent). This will return an essentially arbitrary state if
- # all last_accessed values are equal.
- states.sort(lastaccessed_sort)
- DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+ # We return the state which was most recently modified, if
+ # possible.
+ states.sort(lastmodified_sort)
+ if states[0].get('_last_modified'):
+ DEBUG and TLOG('TO _p_rc: returning last mod state')
return states[0]
- except ConflictError:
- raise
- except:
- LOG.info('Conflict resolution error in TransientObject',
- exc_info=sys.exc_info())
+ # If we can't determine which object to return on the basis
+ # of last modification time (no state has been modified), we return
+ # the object that was most recently accessed (last pulled out of
+ # our parent). This will return an essentially arbitrary state if
+ # all last_accessed values are equal.
+ states.sort(lastaccessed_sort)
+ DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+ return states[0]
+
getName = getId # this is for SQLSession compatibility
def _generateUniqueId(self):
Modified: Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml
===================================================================
--- Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml 2004-09-18 03:27:57 UTC (rev 27630)
@@ -13,7 +13,7 @@
(the "data object timeout") after which it will be flushed.
</p>
-<dtml-call nudge><!-- turn the buckets if necessary -->
+<dtml-call housekeep><!-- turn the buckets if necessary -->
<p class="form-label">
<font color="green">
Added: Zope/trunk/lib/python/Products/Transience/tests/testCounters.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testCounters.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testCounters.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,99 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import os
+from unittest import TestCase, TestSuite, makeSuite
+from ZODB.POSException import ConflictError
+from ZODB.FileStorage import FileStorage
+from ZODB.DB import DB
+
+from Products.Transience.Transience import Length2, Increaser
+
+class Base(TestCase):
+ db = None
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ if self.db is not None:
+ self.db.close()
+ self.storage.cleanup()
+
+ def openDB(self):
+ n = 'fs_tmp__%s' % os.getpid()
+ self.storage = FileStorage(n)
+ self.db = DB(self.storage)
+
+class TestLength2(Base):
+
+ def testConflict(self):
+ # this test fails on the HEAD (MVCC?)
+ self.openDB()
+ length = Length2(0)
+
+ r1 = self.db.open().root()
+ r1['ob'] = length
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ copy = r2['ob']
+ # The following ensures that copy is loaded.
+ self.assertEqual(copy(),0)
+
+ # First transaction.
+ length.increment(10)
+ length.decrement(1)
+ get_transaction().commit()
+
+ # Second transaction.
+ length = copy
+ length.increment(20)
+ length.decrement(2)
+ get_transaction().commit()
+
+ self.assertEqual(length(), 10+20-max(1,2))
+
+class TestIncreaser(Base):
+
+ def testConflict(self):
+ self.openDB()
+ increaser = Increaser(0)
+
+ r1 = self.db.open().root()
+ r1['ob'] = increaser
+ get_transaction().commit()
+
+ r2 = self.db.open().root()
+ copy = r2['ob']
+ # The following ensures that copy is loaded.
+ self.assertEqual(copy(),0)
+
+ # First transaction.
+ increaser.set(10)
+ get_transaction().commit()
+
+
+ # Second transaction.
+ increaser = copy
+ increaser.set(20)
+ get_transaction().commit()
+
+ self.assertEqual(increaser(), 20)
+
+def test_suite():
+ suite = TestSuite()
+ suite.addTest(makeSuite(TestLength2))
+ suite.addTest(makeSuite(TestIncreaser))
+ return suite
Added: Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,40 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import sys, os, time, random, unittest
+
+if __name__ == "__main__":
+ sys.path.insert(0, '../../..')
+
+import ZODB
+from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
+from Products.Transience.TransactionHelper import PreventTransactionCommit, \
+ makeTransactionUncommittable
+
+class TestTransactionHelper(TestCase):
+ def setUp(self):
+ self.t = get_transaction()
+
+ def tearDown(self):
+ self.t = None
+
+ def testUncommittable(self):
+ makeTransactionUncommittable(self.t, "test")
+ self.assertRaises(PreventTransactionCommit, get_transaction().commit)
+
+def test_suite():
+ suite = makeSuite(TestTransactionHelper, 'test')
+ return suite
+
+if __name__ == '__main__':
+ runner = TextTestRunner(verbosity=9)
+ runner.run(test_suite())
Modified: Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py 2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py 2004-09-18 03:27:57 UTC (rev 27630)
@@ -17,7 +17,7 @@
import ZODB
from Products.Transience.Transience import TransientObjectContainer,\
- MaxTransientObjectsExceeded
+ MaxTransientObjectsExceeded, SPARE_BUCKETS, getCurrentTimeslice
from Products.Transience.TransientObject import TransientObject
import Products.Transience.Transience
import Products.Transience.TransientObject
@@ -380,6 +380,18 @@
fauxtime.sleep(180)
self.assertEqual(len(self.t.keys()), 100)
+ def testGarbageCollection(self):
+ # this is pretty implementation-dependent :-(
+ for x in range(0, 100):
+ self.t[x] = x
+ sleeptime = self.period * SPARE_BUCKETS
+ fauxtime.sleep(sleeptime)
+ self.t._invoke_finalize_and_gc()
+ max_ts = self.t._last_finalized_timeslice()
+ keys = list(self.t._data.keys())
+ for k in keys:
+ self.assert_(k > max_ts, "k %s < max_ts %s" % (k, max_ts))
+
def _maxOut(self):
for x in range(11):
self.t.new(str(x))
More information about the Zope-Checkins
mailing list