[Zope-Checkins] CVS: Products/Transience - TransactionHelper.py:1.1.4.1 HowTransienceWorks.stx:1.1.74.3 Transience.py:1.32.12.9 TransientObject.py:1.9.68.5

Chris McDonough chrism at plope.com
Fri Sep 17 22:58:49 EDT 2004


Update of /cvs-repository/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv394

Modified Files:
      Tag: Zope-2_7-branch
	HowTransienceWorks.stx Transience.py TransientObject.py 
Added Files:
      Tag: Zope-2_7-branch
	TransactionHelper.py 
Log Message:
Merge chrism-pre273-branch to Zope-2_7-branch.  This checkin plus others
made to the ZODB 3.2 branch as included with the Zope 2.7 branch fix all
known sessioning issues to date.

Changes:

- TransientObject conflict resolver could potentially fail; when it
  failed, the conflict resolution machinery could resolve the
  TransientObject to None.  (never reported)

- Add a knob (not exposed to UI) to turn off "inband" housekeeping
  Housekeeping can now optionally be done using an external scheduling 
  facility by calling the "housekeep" method regularly.

- Break out actual work that _gc and _finalize do into separate _do methods
  for legibility.

- Dont raise Retry in _replentish if we're in a bucket shortage and we can't
  get the lock. Instead just soldier on and let the conflict happen naturally.

- Create a "roll" function and attempt to prevent conflicts in _gc by using a
  roll.

- Remove "nudge" function in favor of "housekeep".

- Indicators for gc required and replentish required are simpler.

- Replace BTrees.Length.Length with dunny's Length (not _p_independent) in
  order to get "number of objects in data container" right.




=== Added File Products/Transience/TransactionHelper.py ===
import time

class PreventTransactionCommit(Exception):
    def __init__(self, reason):
        self. reason = reason

    def __str__(self):
        return "Uncommittable transaction: " % self.reason
    
class UncommittableJar:
    """ A jar that cannot be committed """
    def __init__(self, reason):
        self.reason = reason
        self.time = time.time()
        
    def sort_key(self):
        return self.time()

    def tpc_begin(self, transaction):
        pass

    def commit(self, obj, transaction):
        pass

    def tpc_vote(self, transaction):
        raise PreventTransactionCommit(self.reason)

class makeTransactionUncommittable:
    """
    - register an uncommittable object with the provided transaction
      which prevents the commit of that transaction
    """
    def __init__(self, transaction, reason):
        self._p_jar = UncommittableJar(reason)
        transaction.register(self)
        


=== Products/Transience/HowTransienceWorks.stx 1.1.74.2 => 1.1.74.3 ===
--- Products/Transience/HowTransienceWorks.stx:1.1.74.2	Sun May 30 03:56:35 2004
+++ Products/Transience/HowTransienceWorks.stx	Fri Sep 17 22:58:19 2004
@@ -42,6 +42,7 @@
     inside of the "_data" structure.  There is a concept of a
     "current" bucket, which is the bucket that is contained within the
     _data structured with a key equal to the "current" timeslice.
+    A current bucket must always exist (this is an invariant).
 
   - A "max_timeslice" integer, which is equal to the "largest"
     timeslice for which there exists a bucket in the _data structure.
@@ -74,10 +75,13 @@
 Replentishing
 
   The TOC performs "finalization", "garbage collection", and "bucket
-  replentishing".  It performs these tasks "in-band".  This means that
-  the TOC does not maintain a separate thread that wakes up every so
-  often to do these housekeeping tasks.  Instead, during the course of
-  normal operations, the TOC opportunistically performs them.
+  replentishing".  It typically performs these tasks "in-band"
+  (although it is possible to do the housekeeping tasks "out of band"
+  as well: see the methods of the Transient Object Container with
+  "housekeep" in their names).  "In band" housekeeping implies that
+  the TOC does not maintain a separate thread or process that wakes up
+  every so often to clean up.  Instead, during the course of normal
+  operations, the TOC opportunistically performs housekeeping functions.
 
   Finalization is defined as optionally calling a function at bucket
   expiration time against all transient objects contained within that


=== Products/Transience/Transience.py 1.32.12.8 => 1.32.12.9 ===
--- Products/Transience/Transience.py:1.32.12.8	Sat Jul  3 20:21:01 2004
+++ Products/Transience/Transience.py	Fri Sep 17 22:58:19 2004
@@ -30,13 +30,13 @@
      TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
      StringKeyedHomogeneousItemContainer, TransientItemContainer
 
-from BTrees.Length import Length
+from BTrees.Length import Length as BTreesLength
 from BTrees.OOBTree import OOBTree
 from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
 
 from Persistence import Persistent
 from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
 from AccessControl import ClassSecurityInfo, getSecurityManager
 from AccessControl.SecurityManagement import newSecurityManager, \
      setSecurityManager
@@ -129,6 +129,7 @@
 
     _limit = 0
     _data = None
+    _inband_housekeeping = True
 
     security.setDefaultAccess('deny')
 
@@ -206,7 +207,7 @@
         # We make enough buckets initially to last us a while, and
         # we subsequently extend _data with fresh buckets and remove old
         # buckets as necessary during normal operations (see
-        # _gc() and _replentish()).
+        # _replentish() and _gc()).
         self._data = DATA_CLASS()
 
         # populate _data with some number of buckets, each of which
@@ -232,6 +233,10 @@
         # each expired item.
         self._last_finalized_timeslice = Increaser(-self._period)
 
+        # '_last_gc_timeslice' is a value that indicates in which
+        # timeslice the garbage collection process was last run.
+        self._last_gc_timeslice = Increaser(-self._period)
+        
         # our "_length" is the number of "active" data objects in _data.
         # it does not include items that are still kept in _data but need to
         # be garbage collected.
@@ -239,8 +244,10 @@
         # we need to maintain the length of the index structure separately
         # because getting the length of a BTree is very expensive, and it
         # doesn't really tell us which ones are "active" anyway.
-        try: self._length.set(0)
-        except AttributeError: self._length = self.getLen = Length()
+        try:
+            self._length.set(0)
+        except AttributeError:
+            self._length = self.getLen = Length2()
 
     def _getCurrentSlices(self, now):
         if self._timeout_slices:
@@ -269,15 +276,14 @@
             bucket = self._data.get(0)
             return bucket.get(k, default)
 
-        # always call finalize
-        self._finalize(current_ts)
-
-        # call gc and/or replentish on an only-as needed basis
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
 
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
+        else:
+            # dont allow the TOC to stop working in an emergency bucket
+            # shortage
+            if self._in_emergency_bucket_shortage(current_ts):
+                self._replentish(current_ts)
 
         # SUBTLETY ALERTY TO SELF: do not "improve" the code below
         # unnecessarily, as it will end only in tears.  The lack of aliases
@@ -288,7 +294,8 @@
         found_ts = None
 
         for ts in current_slices:
-            abucket = self._data.get(ts, None)
+            abucket = self._data.get(ts, None) # XXX ReadConflictError hotspot
+
             if abucket is None:
                 DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
                 continue
@@ -348,13 +355,12 @@
         else:
             current_ts = 0
 
-        self._finalize(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
 
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
-
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
+        elif self._in_emergency_bucket_shortage(current_ts):
+            # if our scheduler fails, dont allow the TOC to stop working
+            self._replentish(current_ts, force=True)
 
         STRICT and _assert(self._data.has_key(current_ts))
         current = self._getCurrentSlices(current_ts)
@@ -374,8 +380,8 @@
     def keys(self):
         return self._all().keys()
 
-    def rawkeys(self, current_ts):
-        # for debugging
+    def raw(self, current_ts):
+        # for debugging and unit testing
         current = self._getCurrentSlices(current_ts)
 
         current.reverse() # overwrite older with newer
@@ -425,15 +431,20 @@
         STRICT and _assert(self._data.has_key(current_ts))
         if item is _marker:
             # the key didnt already exist, this is a new item
-            if self._limit and len(self) >= self._limit:
+
+            length = self._length() # XXX ReadConflictError hotspot
+
+            if self._limit and length >= self._limit:
                 LOG('Transience', WARNING,
                     ('Transient object container %s max subobjects '
                      'reached' % self.getId())
                     )
                 raise MaxTransientObjectsExceeded, (
                  "%s exceeds maximum number of subobjects %s" %
-                 (len(self), self._limit))
-            self._length.change(1)
+                 (length, self._limit))
+
+            self._length.increment(1)
+
         DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' %
                        (k, current_ts))
         current_bucket = self._data[current_ts]
@@ -460,7 +471,11 @@
         if not issubclass(BUCKET_CLASS, Persistent):
             # tickle persistence machinery
             self._data[current_ts] = bucket
-        self._length.change(-1)
+
+        # XXX does increment(-1) make any sense here?
+        # rationale from dunny: we are removing an item rather than simply
+        # declaring it to be unused?
+        self._length.increment(-1)
         return current_ts, item
 
     def __len__(self):
@@ -496,78 +511,45 @@
         DEBUG and TLOG('has_key: returning false from for %s' % k)
         return False
 
-    def _roll(self, now, reason):
-        """
-        Roll the dice to see if we're the lucky thread that does
-        bucket replentishment or gc.  This method is guaranteed to return
-        true at some point as the difference between high and low naturally
-        diminishes to zero.
-
-        The reason we do the 'random' dance in the last part of this
-        is to minimize the chance that two threads will attempt to
-        do housekeeping at the same time (causing conflicts).
-        """
-        low = now/self._period
-        high = self._max_timeslice()/self._period
-        if high <= low:
-            # we really need to win this roll because we have no
-            # spare buckets (and no valid values to provide to randrange), so
-            # we rig the toss.
-            DEBUG and TLOG('_roll: %s rigged toss' % reason)
-            return True
-        else:
-            # we're not in an emergency bucket shortage, so we can
-            # take our chances during the roll.  It's unlikely that
-            # two threads will win the roll simultaneously, so we
-            # avoid a certain class of conflicts here.
-            if random.randrange(low, high) == low: # WINNAH!
-                DEBUG and TLOG("_roll: %s roll winner" % reason)
-                return True
-        DEBUG and TLOG("_roll: %s roll loser" % reason)
-        return False
-
     def _get_max_expired_ts(self, now):
         return now - (self._period * (self._timeout_slices + 1))
 
+    def _in_emergency_bucket_shortage(self, now):
+        max_ts = self._max_timeslice()
+        low = now/self._period
+        high = max_ts/self._period
+        required = high <= low
+        return required
+
     def _finalize(self, now):
+        """ Call finalization handlers for the data in each stale bucket """
         if not self._timeout_slices:
             DEBUG and TLOG('_finalize: doing nothing (no timeout)')
             return # don't do any finalization if there is no timeout
 
         # The nature of sessioning is that when the timeslice rolls
         # over, all active threads will try to do a lot of work during
-        # finalization, all but one unnecessarily.  We really don't
-        # want more than one thread at a time to try to finalize
-        # buckets at the same time so we try to lock. We give up if we
-        # can't lock immediately because it doesn't matter if we skip
-        # a couple of opportunities for finalization, as long as it
-        # gets done by some thread eventually.  A similar pattern
-        # exists for _gc and _replentish.
+        # finalization if inband housekeeping is enabled, all but one
+        # unnecessarily.  We really don't want more than one thread at
+        # a time to try to finalize buckets at the same time so we try
+        # to lock. We give up if we can't lock immediately because it
+        # doesn't matter if we skip a couple of opportunities for
+        # finalization, as long as it gets done by some thread
+        # eventually.  A similar pattern exists for _gc and
+        # _replentish.
 
         if not self.finalize_lock.acquire(0):
-            DEBUG and TLOG('_finalize: couldnt acquire lock')
+            DEBUG and TLOG('_finalize: could not acquire lock, returning')
             return
 
         try:
             DEBUG and TLOG('_finalize: lock acquired successfully')
-
-            if now is None:
-                now = getCurrentTimeslice(self._period) # for unit tests
+            last_finalized = self._last_finalized_timeslice()
 
             # we want to start finalizing from one timeslice after the
-            # timeslice which we last finalized.  Note that finalizing
-            # an already-finalized bucket somehow sends persistence
-            # into a spin with an exception later raised:
-            # "SystemError: error return without exception set",
-            # typically coming from
-            # Products.Sessions.SessionDataManager, line 182, in
-            # _getSessionDataObject (if getattr(ob, '__of__', None)
-            # and getattr(ob, 'aq_parent', None)). According to this
-            # email message from Jim, it may be because the ob is
-            # ghosted and doesn't have a _p_jar somehow:
-            #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
-
-            start_finalize  = self._last_finalized_timeslice() + self._period
+            # timeslice which we last finalized.
+            
+            start_finalize  = last_finalized + self._period
 
             # we want to finalize only up to the maximum expired timeslice
             max_ts = self._get_max_expired_ts(now)
@@ -577,124 +559,221 @@
                     '_finalize: start_finalize (%s) >= max_ts (%s), '
                     'doing nothing' % (start_finalize, max_ts))
                 return
-        
-            DEBUG and TLOG('_finalize: now is %s' % now)
-            DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
-            DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
-
-            to_finalize = list(self._data.keys(start_finalize, max_ts))
-            DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
-
-            delta = 0
-
-            for key in to_finalize:
+            else:
+                DEBUG and TLOG(
+                    '_finalize: start_finalize (%s) <= max_ts (%s), '
+                    'finalization possible' % (start_finalize, max_ts))
+                # we don't try to avoid conflicts here by doing a "random"
+                # dance (ala _replentish and _gc) because it's important that
+                # buckets are finalized as soon as possible after they've
+                # expired in order to call the delete notifier "on time".
+                self._do_finalize_work(now, max_ts, start_finalize)
 
-                assert(start_finalize <= key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                values = list(self._data[key].values())
-                DEBUG and TLOG('_finalize: values to notify from ts %s '
-                               'are %s' % (key, `list(values)`))
+        finally:
+            self.finalize_lock.release()
 
-                delta += len(values)
+    def _do_finalize_work(self, now, max_ts, start_finalize):
+        # this is only separated from _finalize for readability; it
+        # should generally not be called by anything but _finalize
+        DEBUG and TLOG('_do_finalize_work: entering')
+        DEBUG and TLOG('_do_finalize_work: now is %s' % now)
+        DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
+        DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
+                       start_finalize)
+
+        to_finalize = list(self._data.keys(start_finalize, max_ts))
+        DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
+
+        delta = 0
+
+        for key in to_finalize:
+
+            _assert(start_finalize <= key)
+            _assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            values = list(self._data[key].values())
+            DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
+                           'are %s' % (key, `list(values)`))
+
+            delta += len(values)
+
+            for v in values:
+                self.notifyDel(v)
+
+        if delta:
+            self._length.decrement(delta)
+
+        DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
+                       'to max_ts of %s' % max_ts)
+
+        self._last_finalized_timeslice.set(max_ts)
+
+    def _invoke_finalize_and_gc(self):
+        # for unit testing purposes only!
+        last_finalized = self._last_finalized_timeslice()
+        now = getCurrentTimeslice(self._period) # for unit tests
+        start_finalize  = last_finalized + self._period
+        max_ts = self._get_max_expired_ts(now)
+        self._do_finalize_work(now, max_ts, start_finalize)
+        self._do_gc_work(now)
 
-                for v in values:
-                    self.notifyDel(v)
+    def _replentish(self, now):
+        """ Add 'fresh' future or current buckets """
+        if not self._timeout_slices:
+            DEBUG and TLOG('_replentish: no timeout, doing nothing')
+            return
+        
+        # the difference between high and low naturally diminishes to
+        # zero as now approaches self._max_timeslice() during normal
+        # operations.  If high <= low, it means we have no current bucket,
+        # so we *really* need to replentish (having a current bucket is
+        # an invariant for continued operation).
 
-            if delta:
-                self._length.change(-delta)
+        required = self._in_emergency_bucket_shortage(now)
+        lock_acquired = self.replentish_lock.acquire(0)
 
-            DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
-                           'to max_ts of %s' % max_ts)
+        try:
+            if required:
+                # we're in an emergency bucket shortage, we need to
+                # replentish regardless of whether we got the lock or
+                # not.  (if we didn't get the lock, this transaction
+                # will likely result in a conflict error, that's ok)
+                if lock_acquired:
+                    DEBUG and TLOG('_replentish: required, lock acquired)')
+                else:
+                    DEBUG and TLOG('_replentish: required, lock NOT acquired)')
+                max_ts = self._max_timeslice()
+                self._do_replentish_work(now, max_ts)
+
+            elif lock_acquired:
+                # If replentish is optional, minimize the chance that
+                # two threads will attempt to do replentish work at
+                # the same time (which causes conflicts) by
+                # introducing a random element.
+                DEBUG and TLOG('_replentish: attempting optional replentish '
+                               '(lock acquired)')
+                max_ts = self._max_timeslice()
+                low = now/self._period
+                high = max_ts/self._period
+                if roll(low, high, 'optional replentish'):
+                    self._do_replentish_work(now, max_ts)
 
-            self._last_finalized_timeslice.set(max_ts)
+            else:
+                # This is an optional replentish and we can't acquire
+                # the lock, bail.
+                DEBUG and TLOG('_optional replentish attempt aborted, could '
+                               'not acquire lock.')
+                return
 
         finally:
-            self.finalize_lock.release()
+            if lock_acquired:
+                self.replentish_lock.release()
 
-    def _replentish(self, now):
-        # available_spares == the number of "spare" buckets that exist in
-        # "_data"
-        if not self._timeout_slices:
-            return # do nothing if no timeout
-        
-        if not self.replentish_lock.acquire(0):
-            DEBUG and TLOG('_replentish: couldnt acquire lock')
+    def _do_replentish_work(self, now, max_ts):
+        DEBUG and TLOG('_do_replentish_work: entering')
+        # this is only separated from _replentish for readability; it
+        # should generally not be called by anything but _replentish
+
+        # available_spares == the number of "spare" buckets that exist
+        # in "_data"
+        available_spares = (max_ts - now) / self._period
+        DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+        DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+        DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+                       % available_spares)
+
+        if available_spares >= SPARE_BUCKETS:
+            DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+                           'SPARE_BUCKETS (%s), doing '
+                           'nothing'% (available_spares,
+                                       SPARE_BUCKETS))
             return
 
-        try:
-            max_ts = self._max_timeslice()
-            available_spares = (max_ts-now) / self._period
-            DEBUG and TLOG('_replentish: now = %s' % now)
-            DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
-            DEBUG and TLOG('_replentish: available_spares = %s'
-                           % available_spares)
-
-            if available_spares >= SPARE_BUCKETS:
-                DEBUG and TLOG('_replentish: available_spares (%s) >= '
-                               'SPARE_BUCKETS (%s), doing '
-                               'nothing'% (available_spares,
-                                           SPARE_BUCKETS))
-                return
+        if max_ts < now:
+            # the newest bucket in self._data is older than now!
+            replentish_start = now
+            replentish_end = now + (self._period * SPARE_BUCKETS)
 
-            if max_ts < now:
-                replentish_start = now
-                replentish_end = now + (self._period * SPARE_BUCKETS)
+        else:
+            replentish_start = max_ts + self._period
+            replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
 
-            else:
-                replentish_start = max_ts + self._period
-                replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+        DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+                       replentish_start)
+        DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+                       % replentish_end)
+        # n is the number of buckets to create
+        n = (replentish_end - replentish_start) / self._period
+        new_buckets = getTimeslices(replentish_start, n, self._period)
+        new_buckets.reverse()
+        STRICT and _assert(new_buckets)
+        DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+        DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+                       % new_buckets)
+        for k in new_buckets:
+            STRICT and _assert(not self._data.has_key(k))
+            self._data[k] = BUCKET_CLASS() # XXX ReadConflictError hotspot
 
-            DEBUG and TLOG('_replentish: replentish_start = %s' %
-                           replentish_start)
-            DEBUG and TLOG('_replentish: replentish_end = %s'
-                           % replentish_end)
-            # n is the number of buckets to create
-            n = (replentish_end - replentish_start) / self._period
-            new_buckets = getTimeslices(replentish_start, n, self._period)
-            new_buckets.reverse()
-            STRICT and _assert(new_buckets)
-            DEBUG and TLOG('_replentish: adding %s new buckets' % n)
-            DEBUG and TLOG('_replentish: buckets to add = %s'
-                           % new_buckets)
-            for k in new_buckets:
-                STRICT and _assert(not self._data.has_key(k))
-                try:
-                    self._data[k] = BUCKET_CLASS()
-                except ConflictError:
-                    DEBUG and TLOG('_replentish: conflict when adding %s' % k)
-                    time.sleep(random.uniform(0, 1)) # add entropy
-                    raise
-            self._max_timeslice.set(max(new_buckets))
-        finally:
-            self.replentish_lock.release()
+        self._max_timeslice.set(max(new_buckets))
 
     def _gc(self, now=None):
+        """ Remove stale buckets """
         if not self._timeout_slices:
             return # dont do gc if there is no timeout
 
+        # give callers a good chance to do nothing (gc isn't as important
+        # as replentishment or finalization)
+        if not roll(0, 5, 'gc'):
+            DEBUG and TLOG('_gc: lost roll, doing nothing')
+            return
+
         if not self.gc_lock.acquire(0):
             DEBUG and TLOG('_gc: couldnt acquire lock')
             return
 
-        try:
+        try: 
             if now is None:
                 now = getCurrentTimeslice(self._period) # for unit tests
 
-            # we want to garbage collect all buckets that have already been run
-            # through finalization
-            max_ts = self._last_finalized_timeslice()
-
-            DEBUG and TLOG('_gc: now is %s' % now)
-            DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
-
-            for key in list(self._data.keys(None, max_ts)):
-                assert(key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                DEBUG and TLOG('deleting %s from _data' % key)
-                del self._data[key]
+            last_gc = self._last_gc_timeslice()
+            gc_every = self._period * round(SPARE_BUCKETS / 2.0)
+
+            if (now - last_gc) < gc_every:
+                DEBUG and TLOG('_gc: gc attempt not yet required '
+                               '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+                return
+            else:
+                DEBUG and TLOG(
+                    '_gc:  (%s -%s) > %s, gc invoked' % (now, last_gc,
+                                                          gc_every))
+                self._do_gc_work(now)
+
         finally:
             self.gc_lock.release()
 
+    def _do_gc_work(self, now):
+        # this is only separated from _gc for readability; it should
+        # generally not be called by anything but _gc
+
+        # we garbage collect any buckets that have already been run
+        # through finalization
+        DEBUG and TLOG('_do_gc_work: entering')
+
+        max_ts = self._last_finalized_timeslice()
+
+        DEBUG and TLOG('_do_gc_work: max_ts is %s' % max_ts)
+        to_gc = list(self._data.keys(None, max_ts))
+        DEBUG and TLOG('_do_gc_work: to_gc is: %s' % str(to_gc))
+
+        for key in to_gc:
+            _assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            DEBUG and TLOG('_do_gc_work: deleting %s from _data' % key)
+            del self._data[key]
+
+        DEBUG and TLOG('_do_gc_work: setting last_gc_timeslice to %s' % now)
+        self._last_gc_timeslice.set(now)
+
     def notifyAdd(self, item):
         DEBUG and TLOG('notifyAdd with %s' % item)
         callback = self._getCallback(self._addCallback)
@@ -830,12 +909,36 @@
     def setDelNotificationTarget(self, f):
         self._delCallback = f
 
-    security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
-    def nudge(self):
-        """ Used by mgmt interface to maybe do housekeeping each time
-        a screen is shown """
-        # run garbage collector so view is correct
-        self._gc()
+    security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
+    def disableInbandHousekeeping(self):
+        """ No longer perform inband housekeeping """
+        self._inband_housekeeping = False
+
+    security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
+    def enableInbandHousekeeping(self):
+        """ (Re)enable inband housekeeping """
+        self._inband_housekeeping = True
+
+    security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
+    def isInbandHousekeepingEnabled(self):
+        """ Report if inband housekeeping is enabled """
+        return self._inband_housekeeping
+
+    security.declareProtected('View', 'housekeep')
+    def housekeep(self):
+        """ Call this from a scheduler at least every
+        self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
+        housekeeping """
+        # we can protect this method from being called too often by
+        # anonymous users as necessary in the future; we already have a lot
+        # of protection as-is though so no need to make it more complicated
+        # than necessary at the moment
+        self._housekeep(getCurrentTimeslice(self._period))
+
+    def _housekeep(self, now):
+        self._finalize(now)
+        self._replentish(now)
+        self._gc(now)
 
     security.declareProtected(MANAGE_CONTAINER_PERM,
         'manage_changeTransientObjectContainer')
@@ -868,9 +971,17 @@
 
         # f/w compat: 2.8 cannot use __len__ as an instance variable
         if not state.has_key('_length'):
-            length = state.get('__len__', Length())
+            length = state.get('__len__', Length2())
             self._length = self.getLen = length
 
+        oldlength = state['_length']
+        if isinstance(oldlength, BTreesLength):
+            # TOCS prior to 2.7.3 had a BTrees.Length.Length object as
+            # the TOC length object, replace it with our own Length2
+            # that does our conflict resolution correctly:
+            sz = oldlength()
+            self._length = self.getLen = Length2(sz)
+
         # TOCs prior to 2.7.1 took their period from a global
         if not state.has_key('_period'):
             self._period = 20 # this was the default for all prior releases
@@ -891,6 +1002,10 @@
         if not state.has_key('_last_finalized_timeslice'):
             self._last_finalized_timeslice = Increaser(-self._period)
 
+        # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+        if not state.has_key('_last_gc_timeslice'):
+            self._last_gc_timeslice = Increaser(-self._period)
+
         # we should probably delete older attributes from state such as
         # '_last_timeslice', '_deindex_next',and '__len__' here but we leave
         # them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -919,6 +1034,22 @@
         l.insert(0, begin + (x * period))
     return l
 
+def roll(low, high, reason):
+    try:
+        result = random.randrange(low, high)
+    except ValueError:
+        # empty range, must win this roll
+        result = low
+
+    if result == low:
+        DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
+                       (low, high, result, reason))
+        return True
+    else:
+        DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
+                       (low, high, result, reason))
+        return False
+
 def _assert(case):
     if not case:
         raise AssertionError
@@ -926,8 +1057,8 @@
 class Increaser(Persistent):
     """
     A persistent object representing a typically increasing integer that
-    has conflict resolution uses the greatest integer out of the three
-    available states
+    has conflict resolution which uses the greatest integer out of the three
+    available states.
     """
     def __init__(self, v):
         self.value = v
@@ -947,7 +1078,51 @@
     def _p_resolveConflict(self, old, state1, state2):
         return max(old, state1, state2)
 
-    def _p_independent(self):
-        return 1
+
+class Length2(Persistent):
+    """
+    A persistent object responsible for maintaining a repesention of
+    the number of current transient objects.
+
+    Conflict resolution is sensitive to which methods are used to
+    change the length.
+    """
+    def __init__(self, value=0):
+        self.set(value)
+
+    def set(self, value):
+        self.value = value
+        self.floor = 0
+        self.ceiling = value
+
+    def increment(self, delta):
+        """Increase the length by delta.
+
+        Conflict resolution will take the sum of all the increments."""
+        self.ceiling += delta
+        self.value += delta
+
+    def decrement(self, delta):
+        """Decrease the length by delta.
+
+        Conflict resolution will take the highest decrement."""
+        self.floor += delta
+        self.value -= delta
+
+    def __getstate__(self):
+        return self.__dict__
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+
+    def __call__(self):
+        return self.value
+
+    def _p_resolveConflict(self, old, saved, new):
+        new['ceiling'] = saved['ceiling'] + new['ceiling'] - old['ceiling']
+        new['floor'] = max(old['floor'], saved['floor'], new['floor'])
+        new['value'] = new['ceiling'] - new['floor']
+        return new
 
 Globals.InitializeClass(TransientObjectContainer)
+


=== Products/Transience/TransientObject.py 1.9.68.4 => 1.9.68.5 ===
--- Products/Transience/TransientObject.py:1.9.68.4	Sun Sep  5 01:57:07 2004
+++ Products/Transience/TransientObject.py	Fri Sep 17 22:58:19 2004
@@ -193,71 +193,59 @@
     # Other non interface code
     #
 
-    def _p_independent(self):
-        # My state doesn't depend on or materially effect the state of
-        # other objects (eliminates read conflicts).
-        return 1
-
     def _p_resolveConflict(self, saved, state1, state2):
         DEBUG and TLOG('entering TO _p_rc')
         DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % (
             saved, state1, state2))
-        try:
-            states = [saved, state1, state2]
+        states = [saved, state1, state2]
 
-            # We can clearly resolve the conflict if one state is invalid,
-            # because it's a terminal state.
-            for state in states:
-                if state.has_key('_invalid'):
-                    DEBUG and TLOG('TO _p_rc: a state was invalid')
-                    return state
-            # The only other times we can clearly resolve the conflict is if
-            # the token, the id, or the creation time don't differ between
-            # the three states, so we check that here.  If any differ, we punt
-            # by raising ConflictError.
-            attrs = ['token', 'id', '_created']
-            for attr in attrs:
-                svattr = saved.get(attr)
-                s1attr = state1.get(attr)
-                s2attr = state2.get(attr)
-                DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
-                               (attr, svattr, s1attr, s2attr))
-                if not svattr==s1attr==s2attr:
-                    DEBUG and TLOG('TO _p_rc: cant resolve conflict')
-                    raise ConflictError
+        # We can clearly resolve the conflict if one state is invalid,
+        # because it's a terminal state.
+        for state in states:
+            if state.has_key('_invalid'):
+                DEBUG and TLOG('TO _p_rc: a state was invalid')
+                return state
 
-            # Now we need to do real work.
-            #
-            # Data in our _container dictionaries might conflict.  To make
-            # things simple, we intentionally create a race condition where the
-            # state which was last modified "wins".  It would be preferable to
-            # somehow merge our _containers together, but as there's no
-            # generally acceptable way to union their states, there's not much
-            # we can do about it if we want to be able to resolve this kind of
-            # conflict.
+        # The only other times we can clearly resolve the conflict is if
+        # the token, the id, or the creation time don't differ between
+        # the three states, so we check that here.  If any differ, we punt
+        # by raising ConflictError.
+        attrs = ['token', 'id', '_created']
+        for attr in attrs:
+            svattr = saved.get(attr)
+            s1attr = state1.get(attr)
+            s2attr = state2.get(attr)
+            DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
+                           (attr, svattr, s1attr, s2attr))
+            if not svattr==s1attr==s2attr:
+                DEBUG and TLOG('TO _p_rc: cant resolve conflict')
+                raise ConflictError
 
-            # We return the state which was most recently modified, if
-            # possible.
-            states.sort(lastmodified_sort)
-            if states[0].get('_last_modified'):
-                DEBUG and TLOG('TO _p_rc: returning last mod state')
-                return states[0]
+        # Now we need to do real work.
+        #
+        # Data in our _container dictionaries might conflict.  To make
+        # things simple, we intentionally create a race condition where the
+        # state which was last modified "wins".  It would be preferable to
+        # somehow merge our _containers together, but as there's no
+        # generally acceptable way to union their states, there's not much
+        # we can do about it if we want to be able to resolve this kind of
+        # conflict.
 
-            # If we can't determine which object to return on the basis
-            # of last modification time (no state has been modified), we return
-            # the object that was most recently accessed (last pulled out of
-            # our parent).  This will return an essentially arbitrary state if
-            # all last_accessed values are equal.
-            states.sort(lastaccessed_sort)
-            DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+        # We return the state which was most recently modified, if
+        # possible.
+        states.sort(lastmodified_sort)
+        if states[0].get('_last_modified'):
+            DEBUG and TLOG('TO _p_rc: returning last mod state')
             return states[0]
-        except ConflictError:
-            raise
-        except:
-            LOG('Transience', INFO,
-                'Conflict resolution error in TransientObject', '',
-                sys.exc_info()
-                )
+
+        # If we can't determine which object to return on the basis
+        # of last modification time (no state has been modified), we return
+        # the object that was most recently accessed (last pulled out of
+        # our parent).  This will return an essentially arbitrary state if
+        # all last_accessed values are equal.
+        states.sort(lastaccessed_sort)
+        DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+        return states[0]
 
     getName = getId # this is for SQLSession compatibility
 



More information about the Zope-Checkins mailing list