[Zope-Checkins] CVS: Products/Transience -
TransactionHelper.py:1.1.2.1 Transience.py:1.32.12.8.2.1
Chris McDonough
chrism at plope.com
Sat Sep 11 21:00:02 EDT 2004
Update of /cvs-repository/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv28212
Modified Files:
Tag: chrism-pre273-branch
Transience.py
Added Files:
Tag: chrism-pre273-branch
TransactionHelper.py
Log Message:
Add pre-2.7.3 changes on branch for review.
=== Added File Products/Transience/TransactionHelper.py ===
import time
class PreventTransactionCommit:
pass
class UncommittableJar:
""" A jar that cannot be committed """
def __init__(self, reason):
self.reason = reason
self.time = time.time()
def sort_key(self):
return self.time()
def tpc_begin(self, transaction):
pass
def commit(self, obj, transaction):
pass
def tpc_vote(self, transaction):
raise PreventTransactionCommit(self.reason)
class makeTransactionUncommittable:
"""
- register an uncommittable object with the provided transaction
which prevents the commit of that transaction
"""
def __init__(self, transaction, reason):
self._p_jar = UncommittableJar(reason)
transaction.register(self)
=== Products/Transience/Transience.py 1.32.12.8 => 1.32.12.8.2.1 ===
--- Products/Transience/Transience.py:1.32.12.8 Sat Jul 3 20:21:01 2004
+++ Products/Transience/Transience.py Sat Sep 11 20:59:32 2004
@@ -33,10 +33,11 @@
from BTrees.Length import Length
from BTrees.OOBTree import OOBTree
from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
+from ZODB.POSException import ConflictError, ReadConflictError
from Persistence import Persistent
from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager, \
setSecurityManager
@@ -44,6 +45,7 @@
from zLOG import LOG, WARNING, INFO
from TransientObject import TransientObject
+from TransactionHelper import makeTransactionUncommittable
from Fake import FakeIOBTree
ADD_CONTAINER_PERM = 'Add Transient Object Container'
@@ -53,7 +55,7 @@
ACCESS_TRANSIENTS_PERM = 'Access Transient Objects'
MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
-SPARE_BUCKETS = 15 # minimum number of buckets to keep "spare"
+SPARE_BUCKETS = 1 # minimum number of buckets to keep "spare"
BUCKET_CLASS = OOBTree # constructor for buckets
DATA_CLASS = IOBTree # const for main data structure (timeslice->"bucket")
STRICT = os.environ.get('Z_TOC_STRICT', '')
@@ -206,7 +208,7 @@
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
- # _gc() and _replentish()).
+ # _replentish() and _gc()).
self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which
@@ -232,6 +234,10 @@
# each expired item.
self._last_finalized_timeslice = Increaser(-self._period)
+ # '_last_gc_timeslice' is a value that indicates in which
+ # timeslice the garbage collection process was last run.
+ self._last_gc_timeslice = Increaser(-self._period)
+
# our "_length" is the number of "active" data objects in _data.
# it does not include items that are still kept in _data but need to
# be garbage collected.
@@ -269,15 +275,10 @@
bucket = self._data.get(0)
return bucket.get(k, default)
- # always call finalize
+ # do housekeeping
self._finalize(current_ts)
-
- # call gc and/or replentish on an only-as needed basis
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
-
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
+ self._replentish(current_ts)
+ self._gc(current_ts)
# SUBTLETY ALERTY TO SELF: do not "improve" the code below
# unnecessarily, as it will end only in tears. The lack of aliases
@@ -348,13 +349,10 @@
else:
current_ts = 0
+ # do housekeeping
self._finalize(current_ts)
-
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
-
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
+ self._replentish(current_ts)
+ self._gc(current_ts)
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
@@ -374,8 +372,8 @@
def keys(self):
return self._all().keys()
- def rawkeys(self, current_ts):
- # for debugging
+ def raw(self, current_ts):
+ # for debugging and unit testing
current = self._getCurrentSlices(current_ts)
current.reverse() # overwrite older with newer
@@ -496,40 +494,11 @@
DEBUG and TLOG('has_key: returning false from for %s' % k)
return False
- def _roll(self, now, reason):
- """
- Roll the dice to see if we're the lucky thread that does
- bucket replentishment or gc. This method is guaranteed to return
- true at some point as the difference between high and low naturally
- diminishes to zero.
-
- The reason we do the 'random' dance in the last part of this
- is to minimize the chance that two threads will attempt to
- do housekeeping at the same time (causing conflicts).
- """
- low = now/self._period
- high = self._max_timeslice()/self._period
- if high <= low:
- # we really need to win this roll because we have no
- # spare buckets (and no valid values to provide to randrange), so
- # we rig the toss.
- DEBUG and TLOG('_roll: %s rigged toss' % reason)
- return True
- else:
- # we're not in an emergency bucket shortage, so we can
- # take our chances during the roll. It's unlikely that
- # two threads will win the roll simultaneously, so we
- # avoid a certain class of conflicts here.
- if random.randrange(low, high) == low: # WINNAH!
- DEBUG and TLOG("_roll: %s roll winner" % reason)
- return True
- DEBUG and TLOG("_roll: %s roll loser" % reason)
- return False
-
def _get_max_expired_ts(self, now):
return now - (self._period * (self._timeout_slices + 1))
def _finalize(self, now):
+ """ Call finalization handlers for the data in each stale bucket """
if not self._timeout_slices:
DEBUG and TLOG('_finalize: doing nothing (no timeout)')
return # don't do any finalization if there is no timeout
@@ -555,18 +524,8 @@
now = getCurrentTimeslice(self._period) # for unit tests
# we want to start finalizing from one timeslice after the
- # timeslice which we last finalized. Note that finalizing
- # an already-finalized bucket somehow sends persistence
- # into a spin with an exception later raised:
- # "SystemError: error return without exception set",
- # typically coming from
- # Products.Sessions.SessionDataManager, line 182, in
- # _getSessionDataObject (if getattr(ob, '__of__', None)
- # and getattr(ob, 'aq_parent', None)). According to this
- # email message from Jim, it may be because the ob is
- # ghosted and doesn't have a _p_jar somehow:
- #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
-
+ # timeslice which we last finalized.
+
start_finalize = self._last_finalized_timeslice() + self._period
# we want to finalize only up to the maximum expired timeslice
@@ -612,63 +571,129 @@
self.finalize_lock.release()
def _replentish(self, now):
- # available_spares == the number of "spare" buckets that exist in
- # "_data"
+ """ Add 'fresh' future or current buckets """
if not self._timeout_slices:
return # do nothing if no timeout
+ max_ts = self._max_timeslice()
+
+ low = now/self._period
+ high = max_ts/self._period
+
+ # the difference between high and low naturally diminishes to
+ # zero as now approaches self._max_timeslice() during normal
+ # operations. If high <= low, it means we have no current bucket,
+ # so we *really* need to replentish (having a current bucket is
+ # an invariant for continued operation).
+
+ optional = not (high <= low)
+
if not self.replentish_lock.acquire(0):
- DEBUG and TLOG('_replentish: couldnt acquire lock')
- return
- try:
- max_ts = self._max_timeslice()
- available_spares = (max_ts-now) / self._period
- DEBUG and TLOG('_replentish: now = %s' % now)
- DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
- DEBUG and TLOG('_replentish: available_spares = %s'
- % available_spares)
-
- if available_spares >= SPARE_BUCKETS:
- DEBUG and TLOG('_replentish: available_spares (%s) >= '
- 'SPARE_BUCKETS (%s), doing '
- 'nothing'% (available_spares,
- SPARE_BUCKETS))
+ if not optional:
+ DEBUG and TLOG('_replentish: no current bucket but cant aq '
+ 'lock, making txn uncommittable + retry')
+ # Out of paranoia, make this transaction uncommittable;
+ # this may not be necessary.
+ makeTransactionUncommittable(
+ get_transaction(), "Transience had no current bucket")
+ #time.sleep(random.uniform(0, 1)) # add entropy
+ raise Retry
+
+ else:
+ DEBUG and TLOG('_replentish: couldnt acquire lock, returning')
return
- if max_ts < now:
- replentish_start = now
- replentish_end = now + (self._period * SPARE_BUCKETS)
+ try:
+ if optional:
+ DEBUG and TLOG('_replentish: attempting optional replentish')
+ # We're not in an emergency bucket shortage, so we don't
+ # explicitly need to replentish with fresh new buckets.
+ # Minimize the chance that two threads will attempt to
+ # do housekeeping at the same time (which causes conflicts)
+ # by introducing a random element.
+ if random.randrange(low, high) == high: # do nothing
+ DEBUG and TLOG('_replentish: lost random selection '
+ 'in optional replentish, returning')
+ return
+ else:
+ DEBUG and TLOG('_replentish: won random selection '
+ 'in optional replentish, continuing')
+ self._do_replentish_work(now, max_ts)
else:
- replentish_start = max_ts + self._period
- replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+ # we're in an emergency bucket shortage, we need to replentish
+ DEBUG and TLOG('_replentish: forcing replentish '
+ '(no current bucket)')
+ self._do_replentish_work(now, max_ts)
- DEBUG and TLOG('_replentish: replentish_start = %s' %
- replentish_start)
- DEBUG and TLOG('_replentish: replentish_end = %s'
- % replentish_end)
- # n is the number of buckets to create
- n = (replentish_end - replentish_start) / self._period
- new_buckets = getTimeslices(replentish_start, n, self._period)
- new_buckets.reverse()
- STRICT and _assert(new_buckets)
- DEBUG and TLOG('_replentish: adding %s new buckets' % n)
- DEBUG and TLOG('_replentish: buckets to add = %s'
- % new_buckets)
- for k in new_buckets:
- STRICT and _assert(not self._data.has_key(k))
- try:
- self._data[k] = BUCKET_CLASS()
- except ConflictError:
- DEBUG and TLOG('_replentish: conflict when adding %s' % k)
- time.sleep(random.uniform(0, 1)) # add entropy
- raise
- self._max_timeslice.set(max(new_buckets))
finally:
self.replentish_lock.release()
+ def _do_replentish_work(self, now, max_ts):
+ # this is only separated from _replentish for readability;
+ # it shouldn't be called without the replentish lock being held
+
+ # available_spares == the number of "spare" buckets that exist in
+ # "_data"
+ available_spares = (max_ts - now) / self._period
+ DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+ DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+ DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+ % available_spares)
+
+ if available_spares >= SPARE_BUCKETS:
+ DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+ 'SPARE_BUCKETS (%s), doing '
+ 'nothing'% (available_spares,
+ SPARE_BUCKETS))
+ return
+
+ if max_ts < now:
+ # the newest bucket in self._data is older than now!
+ replentish_start = now
+ replentish_end = now + (self._period * SPARE_BUCKETS)
+
+ else:
+ replentish_start = max_ts + self._period
+ replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
+
+ DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+ replentish_start)
+ DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+ % replentish_end)
+ # n is the number of buckets to create
+ n = (replentish_end - replentish_start) / self._period
+ new_buckets = getTimeslices(replentish_start, n, self._period)
+ new_buckets.reverse()
+ STRICT and _assert(new_buckets)
+ DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+ DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+ % new_buckets)
+ for k in new_buckets:
+ STRICT and _assert(not self._data.has_key(k))
+
+ # this is a conflict hotspot
+ try:
+ self._data[k] = BUCKET_CLASS()
+ except ConflictError:
+ DEBUG and TLOG('_do_replentish_work: conflict when adding %s' %
+ k)
+ # Out of paranoia, make this transaction uncommittable;
+ # this is a fatal error and we need to retry the request
+ # to get back to a sane state but we haven't set max_timeslice
+ # yet, so an exception handler that catches this will screw us.
+ # For ZODB 3.2, this will prevent at least our data invariants
+ # for getting screwed up even if the exception is caught.
+ makeTransactionUncommittable(
+ get_transaction(),
+ "conflict error in Transience _do_replentish_work")
+ raise
+
+ self._max_timeslice.set(max(new_buckets))
+
def _gc(self, now=None):
+ """ Remove stale buckets """
if not self._timeout_slices:
return # dont do gc if there is no timeout
@@ -676,15 +701,25 @@
DEBUG and TLOG('_gc: couldnt acquire lock')
return
- try:
+ try:
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
- # we want to garbage collect all buckets that have already been run
+ last_gc = self._last_gc_timeslice()
+ gc_every = self._period * SPARE_BUCKETS
+
+ if (now - last_gc) < gc_every:
+ DEBUG and TLOG('_gc: gc attempt not yet required '
+ '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+ return
+
+ DEBUG and TLOG('_gc: gc attempt proceeding')
+
+ # we garbage collect any buckets that have already been run
# through finalization
+
max_ts = self._last_finalized_timeslice()
- DEBUG and TLOG('_gc: now is %s' % now)
DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
for key in list(self._data.keys(None, max_ts)):
@@ -692,6 +727,9 @@
STRICT and _assert(self._data.has_key(key))
DEBUG and TLOG('deleting %s from _data' % key)
del self._data[key]
+
+ self._last_gc_timeslice.set(now)
+
finally:
self.gc_lock.release()
@@ -834,8 +872,10 @@
def nudge(self):
""" Used by mgmt interface to maybe do housekeeping each time
a screen is shown """
- # run garbage collector so view is correct
- self._gc()
+ now = getCurrentTimeslice(self._period)
+ self._finalize(now)
+ self._replentish(now)
+ self._gc(now)
security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer')
@@ -891,6 +931,10 @@
if not state.has_key('_last_finalized_timeslice'):
self._last_finalized_timeslice = Increaser(-self._period)
+ # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+ if not state.has_key('_last_gc_timeslice'):
+ self._last_gc_timeslice = Increaser(-self._period)
+
# we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -947,7 +991,7 @@
def _p_resolveConflict(self, old, state1, state2):
return max(old, state1, state2)
- def _p_independent(self):
- return 1
+## def _p_independent(self):
+## return 1
Globals.InitializeClass(TransientObjectContainer)
More information about the Zope-Checkins
mailing list