[Zope-Checkins] CVS: Products/Transience -
TransactionHelper.py:1.1.4.1 HowTransienceWorks.stx:1.1.74.3
Transience.py:1.32.12.9 TransientObject.py:1.9.68.5
Chris McDonough
chrism at plope.com
Fri Sep 17 22:58:49 EDT 2004
Update of /cvs-repository/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv394
Modified Files:
Tag: Zope-2_7-branch
HowTransienceWorks.stx Transience.py TransientObject.py
Added Files:
Tag: Zope-2_7-branch
TransactionHelper.py
Log Message:
Merge chrism-pre273-branch to Zope-2_7-branch. This checkin plus others
made to the ZODB 3.2 branch as included with the Zope 2.7 branch fix all
known sessioning issues to date.
Changes:
- TransientObject conflict resolver could potentially fail; when it
failed, the conflict resolution machinery could resolve the
TransientObject to None. (never reported)
- Add a knob (not exposed to UI) to turn off "inband" housekeeping
Housekeeping can now optionally be done using an external scheduling
facility by calling the "housekeep" method regularly.
- Break out actual work that _gc and _finalize do into separate _do methods
for legibility.
- Dont raise Retry in _replentish if we're in a bucket shortage and we can't
get the lock. Instead just soldier on and let the conflict happen naturally.
- Create a "roll" function and attempt to prevent conflicts in _gc by using a
roll.
- Remove "nudge" function in favor of "housekeep".
- Indicators for gc required and replentish required are simpler.
- Replace BTrees.Length.Length with dunny's Length (not _p_independent) in
order to get "number of objects in data container" right.
=== Added File Products/Transience/TransactionHelper.py ===
import time
class PreventTransactionCommit(Exception):
def __init__(self, reason):
self. reason = reason
def __str__(self):
return "Uncommittable transaction: " % self.reason
class UncommittableJar:
""" A jar that cannot be committed """
def __init__(self, reason):
self.reason = reason
self.time = time.time()
def sort_key(self):
return self.time()
def tpc_begin(self, transaction):
pass
def commit(self, obj, transaction):
pass
def tpc_vote(self, transaction):
raise PreventTransactionCommit(self.reason)
class makeTransactionUncommittable:
"""
- register an uncommittable object with the provided transaction
which prevents the commit of that transaction
"""
def __init__(self, transaction, reason):
self._p_jar = UncommittableJar(reason)
transaction.register(self)
=== Products/Transience/HowTransienceWorks.stx 1.1.74.2 => 1.1.74.3 ===
--- Products/Transience/HowTransienceWorks.stx:1.1.74.2 Sun May 30 03:56:35 2004
+++ Products/Transience/HowTransienceWorks.stx Fri Sep 17 22:58:19 2004
@@ -42,6 +42,7 @@
inside of the "_data" structure. There is a concept of a
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
+ A current bucket must always exist (this is an invariant).
- A "max_timeslice" integer, which is equal to the "largest"
timeslice for which there exists a bucket in the _data structure.
@@ -74,10 +75,13 @@
Replentishing
The TOC performs "finalization", "garbage collection", and "bucket
- replentishing". It performs these tasks "in-band". This means that
- the TOC does not maintain a separate thread that wakes up every so
- often to do these housekeeping tasks. Instead, during the course of
- normal operations, the TOC opportunistically performs them.
+ replentishing". It typically performs these tasks "in-band"
+ (although it is possible to do the housekeeping tasks "out of band"
+ as well: see the methods of the Transient Object Container with
+ "housekeep" in their names). "In band" housekeeping implies that
+ the TOC does not maintain a separate thread or process that wakes up
+ every so often to clean up. Instead, during the course of normal
+ operations, the TOC opportunistically performs housekeeping functions.
Finalization is defined as optionally calling a function at bucket
expiration time against all transient objects contained within that
=== Products/Transience/Transience.py 1.32.12.8 => 1.32.12.9 ===
--- Products/Transience/Transience.py:1.32.12.8 Sat Jul 3 20:21:01 2004
+++ Products/Transience/Transience.py Fri Sep 17 22:58:19 2004
@@ -30,13 +30,13 @@
TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
StringKeyedHomogeneousItemContainer, TransientItemContainer
-from BTrees.Length import Length
+from BTrees.Length import Length as BTreesLength
from BTrees.OOBTree import OOBTree
from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
from Persistence import Persistent
from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager, \
setSecurityManager
@@ -129,6 +129,7 @@
_limit = 0
_data = None
+ _inband_housekeeping = True
security.setDefaultAccess('deny')
@@ -206,7 +207,7 @@
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
- # _gc() and _replentish()).
+ # _replentish() and _gc()).
self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which
@@ -232,6 +233,10 @@
# each expired item.
self._last_finalized_timeslice = Increaser(-self._period)
+ # '_last_gc_timeslice' is a value that indicates in which
+ # timeslice the garbage collection process was last run.
+ self._last_gc_timeslice = Increaser(-self._period)
+
# our "_length" is the number of "active" data objects in _data.
# it does not include items that are still kept in _data but need to
# be garbage collected.
@@ -239,8 +244,10 @@
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive, and it
# doesn't really tell us which ones are "active" anyway.
- try: self._length.set(0)
- except AttributeError: self._length = self.getLen = Length()
+ try:
+ self._length.set(0)
+ except AttributeError:
+ self._length = self.getLen = Length2()
def _getCurrentSlices(self, now):
if self._timeout_slices:
@@ -269,15 +276,14 @@
bucket = self._data.get(0)
return bucket.get(k, default)
- # always call finalize
- self._finalize(current_ts)
-
- # call gc and/or replentish on an only-as needed basis
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
+ if self._inband_housekeeping:
+ self._housekeep(current_ts)
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
+ else:
+ # dont allow the TOC to stop working in an emergency bucket
+ # shortage
+ if self._in_emergency_bucket_shortage(current_ts):
+ self._replentish(current_ts)
# SUBTLETY ALERTY TO SELF: do not "improve" the code below
# unnecessarily, as it will end only in tears. The lack of aliases
@@ -288,7 +294,8 @@
found_ts = None
for ts in current_slices:
- abucket = self._data.get(ts, None)
+ abucket = self._data.get(ts, None) # XXX ReadConflictError hotspot
+
if abucket is None:
DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
continue
@@ -348,13 +355,12 @@
else:
current_ts = 0
- self._finalize(current_ts)
+ if self._inband_housekeeping:
+ self._housekeep(current_ts)
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
-
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
+ elif self._in_emergency_bucket_shortage(current_ts):
+ # if our scheduler fails, dont allow the TOC to stop working
+ self._replentish(current_ts, force=True)
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
@@ -374,8 +380,8 @@
def keys(self):
return self._all().keys()
- def rawkeys(self, current_ts):
- # for debugging
+ def raw(self, current_ts):
+ # for debugging and unit testing
current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer
@@ -425,15 +431,20 @@
STRICT and _assert(self._data.has_key(current_ts))
if item is _marker:
# the key didnt already exist, this is a new item
- if self._limit and len(self) >= self._limit:
+
+ length = self._length() # XXX ReadConflictError hotspot
+
+ if self._limit and length >= self._limit:
LOG('Transience', WARNING,
('Transient object container %s max subobjects '
'reached' % self.getId())
)
raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" %
- (len(self), self._limit))
- self._length.change(1)
+ (length, self._limit))
+
+ self._length.increment(1)
+
DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' %
(k, current_ts))
current_bucket = self._data[current_ts]
@@ -460,7 +471,11 @@
if not issubclass(BUCKET_CLASS, Persistent):
# tickle persistence machinery
self._data[current_ts] = bucket
- self._length.change(-1)
+
+ # XXX does increment(-1) make any sense here?
+ # rationale from dunny: we are removing an item rather than simply
+ # declaring it to be unused?
+ self._length.increment(-1)
return current_ts, item
def __len__(self):
@@ -496,78 +511,45 @@
DEBUG and TLOG('has_key: returning false from for %s' % k)
return False
- def _roll(self, now, reason):
- """
- Roll the dice to see if we're the lucky thread that does
- bucket replentishment or gc. This method is guaranteed to return
- true at some point as the difference between high and low naturally
- diminishes to zero.
-
- The reason we do the 'random' dance in the last part of this
- is to minimize the chance that two threads will attempt to
- do housekeeping at the same time (causing conflicts).
- """
- low = now/self._period
- high = self._max_timeslice()/self._period
- if high <= low:
- # we really need to win this roll because we have no
- # spare buckets (and no valid values to provide to randrange), so
- # we rig the toss.
- DEBUG and TLOG('_roll: %s rigged toss' % reason)
- return True
- else:
- # we're not in an emergency bucket shortage, so we can
- # take our chances during the roll. It's unlikely that
- # two threads will win the roll simultaneously, so we
- # avoid a certain class of conflicts here.
- if random.randrange(low, high) == low: # WINNAH!
- DEBUG and TLOG("_roll: %s roll winner" % reason)
- return True
- DEBUG and TLOG("_roll: %s roll loser" % reason)
- return False
-
def _get_max_expired_ts(self, now):
return now - (self._period * (self._timeout_slices + 1))
+ def _in_emergency_bucket_shortage(self, now):
+ max_ts = self._max_timeslice()
+ low = now/self._period
+ high = max_ts/self._period
+ required = high <= low
+ return required
+
def _finalize(self, now):
+ """ Call finalization handlers for the data in each stale bucket """
if not self._timeout_slices:
DEBUG and TLOG('_finalize: doing nothing (no timeout)')
return # don't do any finalization if there is no timeout
# The nature of sessioning is that when the timeslice rolls
# over, all active threads will try to do a lot of work during
- # finalization, all but one unnecessarily. We really don't
- # want more than one thread at a time to try to finalize
- # buckets at the same time so we try to lock. We give up if we
- # can't lock immediately because it doesn't matter if we skip
- # a couple of opportunities for finalization, as long as it
- # gets done by some thread eventually. A similar pattern
- # exists for _gc and _replentish.
+ # finalization if inband housekeeping is enabled, all but one
+ # unnecessarily. We really don't want more than one thread at
+ # a time to try to finalize buckets at the same time so we try
+ # to lock. We give up if we can't lock immediately because it
+ # doesn't matter if we skip a couple of opportunities for
+ # finalization, as long as it gets done by some thread
+ # eventually. A similar pattern exists for _gc and
+ # _replentish.
if not self.finalize_lock.acquire(0):
- DEBUG and TLOG('_finalize: couldnt acquire lock')
+ DEBUG and TLOG('_finalize: could not acquire lock, returning')
return
try:
DEBUG and TLOG('_finalize: lock acquired successfully')
-
- if now is None:
- now = getCurrentTimeslice(self._period) # for unit tests
+ last_finalized = self._last_finalized_timeslice()
# we want to start finalizing from one timeslice after the
- # timeslice which we last finalized. Note that finalizing
- # an already-finalized bucket somehow sends persistence
- # into a spin with an exception later raised:
- # "SystemError: error return without exception set",
- # typically coming from
- # Products.Sessions.SessionDataManager, line 182, in
- # _getSessionDataObject (if getattr(ob, '__of__', None)
- # and getattr(ob, 'aq_parent', None)). According to this
- # email message from Jim, it may be because the ob is
- # ghosted and doesn't have a _p_jar somehow:
- #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
-
- start_finalize = self._last_finalized_timeslice() + self._period
+ # timeslice which we last finalized.
+
+ start_finalize = last_finalized + self._period
# we want to finalize only up to the maximum expired timeslice
max_ts = self._get_max_expired_ts(now)
@@ -577,124 +559,221 @@
'_finalize: start_finalize (%s) >= max_ts (%s), '
'doing nothing' % (start_finalize, max_ts))
return
-
- DEBUG and TLOG('_finalize: now is %s' % now)
- DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
- DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
-
- to_finalize = list(self._data.keys(start_finalize, max_ts))
- DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
-
- delta = 0
-
- for key in to_finalize:
+ else:
+ DEBUG and TLOG(
+ '_finalize: start_finalize (%s) <= max_ts (%s), '
+ 'finalization possible' % (start_finalize, max_ts))
+ # we don't try to avoid conflicts here by doing a "random"
+ # dance (ala _replentish and _gc) because it's important that
+ # buckets are finalized as soon as possible after they've
+ # expired in order to call the delete notifier "on time".
+ self._do_finalize_work(now, max_ts, start_finalize)
- assert(start_finalize <= key <= max_ts)
- STRICT and _assert(self._data.has_key(key))
- values = list(self._data[key].values())
- DEBUG and TLOG('_finalize: values to notify from ts %s '
- 'are %s' % (key, `list(values)`))
+ finally:
+ self.finalize_lock.release()
- delta += len(values)
+ def _do_finalize_work(self, now, max_ts, start_finalize):
+ # this is only separated from _finalize for readability; it
+ # should generally not be called by anything but _finalize
+ DEBUG and TLOG('_do_finalize_work: entering')
+ DEBUG and TLOG('_do_finalize_work: now is %s' % now)
+ DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
+ DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
+ start_finalize)
+
+ to_finalize = list(self._data.keys(start_finalize, max_ts))
+ DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
+
+ delta = 0
+
+ for key in to_finalize:
+
+ _assert(start_finalize <= key)
+ _assert(key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ values = list(self._data[key].values())
+ DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
+ 'are %s' % (key, `list(values)`))
+
+ delta += len(values)
+
+ for v in values:
+ self.notifyDel(v)
+
+ if delta:
+ self._length.decrement(delta)
+
+ DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
+ 'to max_ts of %s' % max_ts)
+
+ self._last_finalized_timeslice.set(max_ts)
+
+ def _invoke_finalize_and_gc(self):
+ # for unit testing purposes only!
+ last_finalized = self._last_finalized_timeslice()
+ now = getCurrentTimeslice(self._period) # for unit tests
+ start_finalize = last_finalized + self._period
+ max_ts = self._get_max_expired_ts(now)
+ self._do_finalize_work(now, max_ts, start_finalize)
+ self._do_gc_work(now)
- for v in values:
- self.notifyDel(v)
+ def _replentish(self, now):
+ """ Add 'fresh' future or current buckets """
+ if not self._timeout_slices:
+ DEBUG and TLOG('_replentish: no timeout, doing nothing')
+ return
+
+ # the difference between high and low naturally diminishes to
+ # zero as now approaches self._max_timeslice() during normal
+ # operations. If high <= low, it means we have no current bucket,
+ # so we *really* need to replentish (having a current bucket is
+ # an invariant for continued operation).
- if delta:
- self._length.change(-delta)
+ required = self._in_emergency_bucket_shortage(now)
+ lock_acquired = self.replentish_lock.acquire(0)
- DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
- 'to max_ts of %s' % max_ts)
+ try:
+ if required:
+ # we're in an emergency bucket shortage, we need to
+ # replentish regardless of whether we got the lock or
+ # not. (if we didn't get the lock, this transaction
+ # will likely result in a conflict error, that's ok)
+ if lock_acquired:
+ DEBUG and TLOG('_replentish: required, lock acquired)')
+ else:
+ DEBUG and TLOG('_replentish: required, lock NOT acquired)')
+ max_ts = self._max_timeslice()
+ self._do_replentish_work(now, max_ts)
+
+ elif lock_acquired:
+ # If replentish is optional, minimize the chance that
+ # two threads will attempt to do replentish work at
+ # the same time (which causes conflicts) by
+ # introducing a random element.
+ DEBUG and TLOG('_replentish: attempting optional replentish '
+ '(lock acquired)')
+ max_ts = self._max_timeslice()
+ low = now/self._period
+ high = max_ts/self._period
+ if roll(low, high, 'optional replentish'):
+ self._do_replentish_work(now, max_ts)
- self._last_finalized_timeslice.set(max_ts)
+ else:
+ # This is an optional replentish and we can't acquire
+ # the lock, bail.
+ DEBUG and TLOG('_optional replentish attempt aborted, could '
+ 'not acquire lock.')
+ return
finally:
- self.finalize_lock.release()
+ if lock_acquired:
+ self.replentish_lock.release()
- def _replentish(self, now):
- # available_spares == the number of "spare" buckets that exist in
- # "_data"
- if not self._timeout_slices:
- return # do nothing if no timeout
-
- if not self.replentish_lock.acquire(0):
- DEBUG and TLOG('_replentish: couldnt acquire lock')
+ def _do_replentish_work(self, now, max_ts):
+ DEBUG and TLOG('_do_replentish_work: entering')
+ # this is only separated from _replentish for readability; it
+ # should generally not be called by anything but _replentish
+
+ # available_spares == the number of "spare" buckets that exist
+ # in "_data"
+ available_spares = (max_ts - now) / self._period
+ DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+ DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+ DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+ % available_spares)
+
+ if available_spares >= SPARE_BUCKETS:
+ DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+ 'SPARE_BUCKETS (%s), doing '
+ 'nothing'% (available_spares,
+ SPARE_BUCKETS))
return
- try:
- max_ts = self._max_timeslice()
- available_spares = (max_ts-now) / self._period
- DEBUG and TLOG('_replentish: now = %s' % now)
- DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
- DEBUG and TLOG('_replentish: available_spares = %s'
- % available_spares)
-
- if available_spares >= SPARE_BUCKETS:
- DEBUG and TLOG('_replentish: available_spares (%s) >= '
- 'SPARE_BUCKETS (%s), doing '
- 'nothing'% (available_spares,
- SPARE_BUCKETS))
- return
+ if max_ts < now:
+ # the newest bucket in self._data is older than now!
+ replentish_start = now
+ replentish_end = now + (self._period * SPARE_BUCKETS)
- if max_ts < now:
- replentish_start = now
- replentish_end = now + (self._period * SPARE_BUCKETS)
+ else:
+ replentish_start = max_ts + self._period
+ replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
- else:
- replentish_start = max_ts + self._period
- replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+ DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+ replentish_start)
+ DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+ % replentish_end)
+ # n is the number of buckets to create
+ n = (replentish_end - replentish_start) / self._period
+ new_buckets = getTimeslices(replentish_start, n, self._period)
+ new_buckets.reverse()
+ STRICT and _assert(new_buckets)
+ DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+ DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+ % new_buckets)
+ for k in new_buckets:
+ STRICT and _assert(not self._data.has_key(k))
+ self._data[k] = BUCKET_CLASS() # XXX ReadConflictError hotspot
- DEBUG and TLOG('_replentish: replentish_start = %s' %
- replentish_start)
- DEBUG and TLOG('_replentish: replentish_end = %s'
- % replentish_end)
- # n is the number of buckets to create
- n = (replentish_end - replentish_start) / self._period
- new_buckets = getTimeslices(replentish_start, n, self._period)
- new_buckets.reverse()
- STRICT and _assert(new_buckets)
- DEBUG and TLOG('_replentish: adding %s new buckets' % n)
- DEBUG and TLOG('_replentish: buckets to add = %s'
- % new_buckets)
- for k in new_buckets:
- STRICT and _assert(not self._data.has_key(k))
- try:
- self._data[k] = BUCKET_CLASS()
- except ConflictError:
- DEBUG and TLOG('_replentish: conflict when adding %s' % k)
- time.sleep(random.uniform(0, 1)) # add entropy
- raise
- self._max_timeslice.set(max(new_buckets))
- finally:
- self.replentish_lock.release()
+ self._max_timeslice.set(max(new_buckets))
def _gc(self, now=None):
+ """ Remove stale buckets """
if not self._timeout_slices:
return # dont do gc if there is no timeout
+ # give callers a good chance to do nothing (gc isn't as important
+ # as replentishment or finalization)
+ if not roll(0, 5, 'gc'):
+ DEBUG and TLOG('_gc: lost roll, doing nothing')
+ return
+
if not self.gc_lock.acquire(0):
DEBUG and TLOG('_gc: couldnt acquire lock')
return
- try:
+ try:
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
- # we want to garbage collect all buckets that have already been run
- # through finalization
- max_ts = self._last_finalized_timeslice()
-
- DEBUG and TLOG('_gc: now is %s' % now)
- DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
-
- for key in list(self._data.keys(None, max_ts)):
- assert(key <= max_ts)
- STRICT and _assert(self._data.has_key(key))
- DEBUG and TLOG('deleting %s from _data' % key)
- del self._data[key]
+ last_gc = self._last_gc_timeslice()
+ gc_every = self._period * round(SPARE_BUCKETS / 2.0)
+
+ if (now - last_gc) < gc_every:
+ DEBUG and TLOG('_gc: gc attempt not yet required '
+ '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+ return
+ else:
+ DEBUG and TLOG(
+ '_gc: (%s -%s) > %s, gc invoked' % (now, last_gc,
+ gc_every))
+ self._do_gc_work(now)
+
finally:
self.gc_lock.release()
+ def _do_gc_work(self, now):
+ # this is only separated from _gc for readability; it should
+ # generally not be called by anything but _gc
+
+ # we garbage collect any buckets that have already been run
+ # through finalization
+ DEBUG and TLOG('_do_gc_work: entering')
+
+ max_ts = self._last_finalized_timeslice()
+
+ DEBUG and TLOG('_do_gc_work: max_ts is %s' % max_ts)
+ to_gc = list(self._data.keys(None, max_ts))
+ DEBUG and TLOG('_do_gc_work: to_gc is: %s' % str(to_gc))
+
+ for key in to_gc:
+ _assert(key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ DEBUG and TLOG('_do_gc_work: deleting %s from _data' % key)
+ del self._data[key]
+
+ DEBUG and TLOG('_do_gc_work: setting last_gc_timeslice to %s' % now)
+ self._last_gc_timeslice.set(now)
+
def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item)
callback = self._getCallback(self._addCallback)
@@ -830,12 +909,36 @@
def setDelNotificationTarget(self, f):
self._delCallback = f
- security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
- def nudge(self):
- """ Used by mgmt interface to maybe do housekeeping each time
- a screen is shown """
- # run garbage collector so view is correct
- self._gc()
+ security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
+ def disableInbandHousekeeping(self):
+ """ No longer perform inband housekeeping """
+ self._inband_housekeeping = False
+
+ security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
+ def enableInbandHousekeeping(self):
+ """ (Re)enable inband housekeeping """
+ self._inband_housekeeping = True
+
+ security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
+ def isInbandHousekeepingEnabled(self):
+ """ Report if inband housekeeping is enabled """
+ return self._inband_housekeeping
+
+ security.declareProtected('View', 'housekeep')
+ def housekeep(self):
+ """ Call this from a scheduler at least every
+ self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
+ housekeeping """
+ # we can protect this method from being called too often by
+ # anonymous users as necessary in the future; we already have a lot
+ # of protection as-is though so no need to make it more complicated
+ # than necessary at the moment
+ self._housekeep(getCurrentTimeslice(self._period))
+
+ def _housekeep(self, now):
+ self._finalize(now)
+ self._replentish(now)
+ self._gc(now)
security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer')
@@ -868,9 +971,17 @@
# f/w compat: 2.8 cannot use __len__ as an instance variable
if not state.has_key('_length'):
- length = state.get('__len__', Length())
+ length = state.get('__len__', Length2())
self._length = self.getLen = length
+ oldlength = state['_length']
+ if isinstance(oldlength, BTreesLength):
+ # TOCS prior to 2.7.3 had a BTrees.Length.Length object as
+ # the TOC length object, replace it with our own Length2
+ # that does our conflict resolution correctly:
+ sz = oldlength()
+ self._length = self.getLen = Length2(sz)
+
# TOCs prior to 2.7.1 took their period from a global
if not state.has_key('_period'):
self._period = 20 # this was the default for all prior releases
@@ -891,6 +1002,10 @@
if not state.has_key('_last_finalized_timeslice'):
self._last_finalized_timeslice = Increaser(-self._period)
+ # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+ if not state.has_key('_last_gc_timeslice'):
+ self._last_gc_timeslice = Increaser(-self._period)
+
# we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -919,6 +1034,22 @@
l.insert(0, begin + (x * period))
return l
+def roll(low, high, reason):
+ try:
+ result = random.randrange(low, high)
+ except ValueError:
+ # empty range, must win this roll
+ result = low
+
+ if result == low:
+ DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
+ (low, high, result, reason))
+ return True
+ else:
+ DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
+ (low, high, result, reason))
+ return False
+
def _assert(case):
if not case:
raise AssertionError
@@ -926,8 +1057,8 @@
class Increaser(Persistent):
"""
A persistent object representing a typically increasing integer that
- has conflict resolution uses the greatest integer out of the three
- available states
+ has conflict resolution which uses the greatest integer out of the three
+ available states.
"""
def __init__(self, v):
self.value = v
@@ -947,7 +1078,51 @@
def _p_resolveConflict(self, old, state1, state2):
return max(old, state1, state2)
- def _p_independent(self):
- return 1
+
+class Length2(Persistent):
+ """
+ A persistent object responsible for maintaining a repesention of
+ the number of current transient objects.
+
+ Conflict resolution is sensitive to which methods are used to
+ change the length.
+ """
+ def __init__(self, value=0):
+ self.set(value)
+
+ def set(self, value):
+ self.value = value
+ self.floor = 0
+ self.ceiling = value
+
+ def increment(self, delta):
+ """Increase the length by delta.
+
+ Conflict resolution will take the sum of all the increments."""
+ self.ceiling += delta
+ self.value += delta
+
+ def decrement(self, delta):
+ """Decrease the length by delta.
+
+ Conflict resolution will take the highest decrement."""
+ self.floor += delta
+ self.value -= delta
+
+ def __getstate__(self):
+ return self.__dict__
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __call__(self):
+ return self.value
+
+ def _p_resolveConflict(self, old, saved, new):
+ new['ceiling'] = saved['ceiling'] + new['ceiling'] - old['ceiling']
+ new['floor'] = max(old['floor'], saved['floor'], new['floor'])
+ new['value'] = new['ceiling'] - new['floor']
+ return new
Globals.InitializeClass(TransientObjectContainer)
+
=== Products/Transience/TransientObject.py 1.9.68.4 => 1.9.68.5 ===
--- Products/Transience/TransientObject.py:1.9.68.4 Sun Sep 5 01:57:07 2004
+++ Products/Transience/TransientObject.py Fri Sep 17 22:58:19 2004
@@ -193,71 +193,59 @@
# Other non interface code
#
- def _p_independent(self):
- # My state doesn't depend on or materially effect the state of
- # other objects (eliminates read conflicts).
- return 1
-
def _p_resolveConflict(self, saved, state1, state2):
DEBUG and TLOG('entering TO _p_rc')
DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % (
saved, state1, state2))
- try:
- states = [saved, state1, state2]
+ states = [saved, state1, state2]
- # We can clearly resolve the conflict if one state is invalid,
- # because it's a terminal state.
- for state in states:
- if state.has_key('_invalid'):
- DEBUG and TLOG('TO _p_rc: a state was invalid')
- return state
- # The only other times we can clearly resolve the conflict is if
- # the token, the id, or the creation time don't differ between
- # the three states, so we check that here. If any differ, we punt
- # by raising ConflictError.
- attrs = ['token', 'id', '_created']
- for attr in attrs:
- svattr = saved.get(attr)
- s1attr = state1.get(attr)
- s2attr = state2.get(attr)
- DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
- (attr, svattr, s1attr, s2attr))
- if not svattr==s1attr==s2attr:
- DEBUG and TLOG('TO _p_rc: cant resolve conflict')
- raise ConflictError
+ # We can clearly resolve the conflict if one state is invalid,
+ # because it's a terminal state.
+ for state in states:
+ if state.has_key('_invalid'):
+ DEBUG and TLOG('TO _p_rc: a state was invalid')
+ return state
- # Now we need to do real work.
- #
- # Data in our _container dictionaries might conflict. To make
- # things simple, we intentionally create a race condition where the
- # state which was last modified "wins". It would be preferable to
- # somehow merge our _containers together, but as there's no
- # generally acceptable way to union their states, there's not much
- # we can do about it if we want to be able to resolve this kind of
- # conflict.
+ # The only other times we can clearly resolve the conflict is if
+ # the token, the id, or the creation time don't differ between
+ # the three states, so we check that here. If any differ, we punt
+ # by raising ConflictError.
+ attrs = ['token', 'id', '_created']
+ for attr in attrs:
+ svattr = saved.get(attr)
+ s1attr = state1.get(attr)
+ s2attr = state2.get(attr)
+ DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
+ (attr, svattr, s1attr, s2attr))
+ if not svattr==s1attr==s2attr:
+ DEBUG and TLOG('TO _p_rc: cant resolve conflict')
+ raise ConflictError
- # We return the state which was most recently modified, if
- # possible.
- states.sort(lastmodified_sort)
- if states[0].get('_last_modified'):
- DEBUG and TLOG('TO _p_rc: returning last mod state')
- return states[0]
+ # Now we need to do real work.
+ #
+ # Data in our _container dictionaries might conflict. To make
+ # things simple, we intentionally create a race condition where the
+ # state which was last modified "wins". It would be preferable to
+ # somehow merge our _containers together, but as there's no
+ # generally acceptable way to union their states, there's not much
+ # we can do about it if we want to be able to resolve this kind of
+ # conflict.
- # If we can't determine which object to return on the basis
- # of last modification time (no state has been modified), we return
- # the object that was most recently accessed (last pulled out of
- # our parent). This will return an essentially arbitrary state if
- # all last_accessed values are equal.
- states.sort(lastaccessed_sort)
- DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+ # We return the state which was most recently modified, if
+ # possible.
+ states.sort(lastmodified_sort)
+ if states[0].get('_last_modified'):
+ DEBUG and TLOG('TO _p_rc: returning last mod state')
return states[0]
- except ConflictError:
- raise
- except:
- LOG('Transience', INFO,
- 'Conflict resolution error in TransientObject', '',
- sys.exc_info()
- )
+
+ # If we can't determine which object to return on the basis
+ # of last modification time (no state has been modified), we return
+ # the object that was most recently accessed (last pulled out of
+ # our parent). This will return an essentially arbitrary state if
+ # all last_accessed values are equal.
+ states.sort(lastaccessed_sort)
+ DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+ return states[0]
getName = getId # this is for SQLSession compatibility
More information about the Zope-Checkins
mailing list