[Zope-Checkins] CVS: Zope/lib/python/Products/Transience -
Fake.py:1.1.2.1 HowTransienceWorks.stx:1.1.74.2
Transience.py:1.32.12.7
Chris McDonough
chrism at plope.com
Sun May 30 03:57:06 EDT 2004
Update of /cvs-repository/Zope/lib/python/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv31944
Modified Files:
Tag: Zope-2_7-branch
HowTransienceWorks.stx Transience.py
Added Files:
Tag: Zope-2_7-branch
Fake.py
Log Message:
- "Bucket finalization" is now done more aggressively. Instead of waiting
until a bucket is garbage collected (which may be much later than the
expiration of that bucket), we "finalize" a bucket as soon as possible
after it gets expired. This effectively means that the "delete notifier"
will be called much closer to the time that a transient object actually
expires.
- Add a "_last_finalized_timeslice" counter; this counter keeps track of
the bucket which was finalized last. Set the initial value of
"_last_finalized_timeslice" to -period; this services the unit tests for
finalization, where there can actually be a timeslice that is 0 and we
need to finalize that bucket.
- Add a series of locks to prevent finalization, replentishment, and
garbage collection from being attempted by more than one thread
simultaneously.
- Add "Fake" module for interactive testing purposes (swap out BTree for
simpler object during stress tests for isolation purposes).
- Allow DATA_CLASS to be specified (this is the main data structure
class).
- Update docs and tests to account for new finalization strategy.
=== Added File Zope/lib/python/Products/Transience/Fake.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Module used for testing transience (BTree-API-conforming data structure)
"""
from ZODB.PersistentMapping import PersistentMapping
import sys
class FakeIOBTree(PersistentMapping):
def keys(self, min, max):
L = []
if min is None:
min = 0
if max is None:
max = sys.maxint
for k in self.data:
if min <= k <= max:
L.append(k)
return L
=== Zope/lib/python/Products/Transience/HowTransienceWorks.stx 1.1.74.1 => 1.1.74.2 ===
--- Zope/lib/python/Products/Transience/HowTransienceWorks.stx:1.1.74.1 Fri May 14 18:52:12 2004
+++ Zope/lib/python/Products/Transience/HowTransienceWorks.stx Sun May 30 03:56:35 2004
@@ -32,7 +32,7 @@
Data Structures Maintained by a Transient Object Container
- The TOC maintains five important kinds of data structures:
+ The TOC maintains three important kinds of data structures:
- a "_data" structure, which is an IOBTree mapping a "timeslice"
integer to a "bucket" (see next bullet for definition of bucket).
@@ -43,8 +43,11 @@
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
- - A "max_timeslice" integer, which is equal to the "largest" timeslice
- for which there exists a bucket in the _data structure.
+ - A "max_timeslice" integer, which is equal to the "largest"
+ timeslice for which there exists a bucket in the _data structure.
+ This is an optimization given that key operations against BTrees
+ can be slow and could cause conflicts (the same could be achieved
+ via _data.maxKey() otherwise).
When a Transient Object is created via new_or_existing, it is added
to the "current" bucket. As time goes by, the bucket to which the
@@ -67,25 +70,25 @@
All "current" timeslice buckets (as specified by the timeout) are
searched for the transient object, most recent bucket first.
-Housekeeping: Notification, Garbage Collection, and Bucket
+Housekeeping: Finalization, Garbage Collection, and Bucket
Replentishing
- The TOC performs "notification", "garbage collection", and "bucket
+ The TOC performs "finalization", "garbage collection", and "bucket
replentishing". It performs these tasks "in-band". This means that
the TOC does not maintain a separate thread that wakes up every so
often to do these housekeeping tasks. Instead, during the course of
normal operations, the TOC opportunistically performs them.
+ Finalization is defined as optionally calling a function at bucket
+ expiration time against all transient objects contained within that
+ bucket. The optional function call is user-defined, but it is
+ managed by the "notifyDel" method of the TOC.
+
Garbage collection is defined as deleting "expired" buckets in the
_data structure (the _data structure maps a timeslice to a bucket).
Typically this is done by throwing away one or more buckets in the
_data structure after they expire.
- Notification is defined as optionally calling a function at TOC
- finalization time against individual transient object contained
- within a bucket. The optional function call is user-defined, but it
- is managed by the "notifyDel" method of the TOC.
-
Bucket replentishing is defined as the action of (opportunistically)
creating more buckets to insert into the the _data structure,
replacing ones that are deleted during garbage collection. The act
@@ -93,7 +96,9 @@
will be immediately created thereafter. We create new buckets in
batches to reduce the possibility of conflicts.
- Housekeeping is performed on a somewhat random basis to avoid
+ Finalization is attempted on every call to the transience machinery
+ to make TOs appear to expire "on time". Garbage collection and
+ replentishment is performed on a somewhat random basis to avoid
unnecessary conflicts.
Goals
=== Zope/lib/python/Products/Transience/Transience.py 1.32.12.6 => 1.32.12.7 ===
--- Zope/lib/python/Products/Transience/Transience.py:1.32.12.6 Sat May 22 23:51:55 2004
+++ Zope/lib/python/Products/Transience/Transience.py Sun May 30 03:56:35 2004
@@ -44,6 +44,7 @@
from zLOG import LOG, WARNING, INFO
from TransientObject import TransientObject
+from Fake import FakeIOBTree
ADD_CONTAINER_PERM = 'Add Transient Object Container'
MGMT_SCREEN_PERM = 'View management screens'
@@ -54,6 +55,7 @@
SPARE_BUCKETS = 15 # minimum number of buckets to keep "spare"
BUCKET_CLASS = OOBTree # constructor for buckets
+DATA_CLASS = IOBTree # const for main data structure (timeslice->"bucket")
STRICT = os.environ.get('Z_TOC_STRICT', '')
DEBUG = int(os.environ.get('Z_TOC_DEBUG', 0))
@@ -130,6 +132,14 @@
security.setDefaultAccess('deny')
+ # intitialize locks used for finalization, replentishing, and
+ # garbage collection (used in _finalize, _replentish, and _gc
+ # respectively)
+
+ finalize_lock = thread.allocate_lock()
+ replentish_lock = thread.allocate_lock()
+ gc_lock = thread.allocate_lock()
+
def __init__(self, id, title='', timeout_mins=20, addNotification=None,
delNotification=None, limit=0, period_secs=20):
self.id = id
@@ -190,13 +200,14 @@
# "bucket". Each bucket will contain a set of transient items.
# Transient items move automatically from bucket-to-bucket inside
# of the _data structure based on last access time (e.g.
- # "get" calls), escaping destruction only if they move quickly
- # enough.
+ # "get" calls), escaping expiration and eventual destruction only if
+ # they move quickly enough.
+ #
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
# _gc() and _replentish()).
- self._data = IOBTree()
+ self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which
# is "current" for its timeslice key
@@ -207,15 +218,27 @@
self._period)
for i in new_slices:
self._data[i] = BUCKET_CLASS()
- # create an Increaser for max timeslice
+ # max_timeslice is at any time during operations the highest
+ # key value in _data. Its existence is an optimization; getting
+ # the maxKey of a BTree directly is read-conflict-prone.
self._max_timeslice = Increaser(max(new_slices))
else:
self._data[0] = BUCKET_CLASS() # sentinel value for non-expiring
self._max_timeslice = Increaser(0)
- # our "_length" is the length of _index.
+ # '_last_finalized_timeslice' is a value that indicates which
+ # timeslice had its items last run through the finalization
+ # process. The finalization process calls the delete notifier for
+ # each expired item.
+ self._last_finalized_timeslice = Increaser(-self._period)
+
+ # our "_length" is the number of "active" data objects in _data.
+ # it does not include items that are still kept in _data but need to
+ # be garbage collected.
+ #
# we need to maintain the length of the index structure separately
- # because getting the length of a BTree is very expensive.
+ # because getting the length of a BTree is very expensive, and it
+ # doesn't really tell us which ones are "active" anyway.
try: self._length.set(0)
except AttributeError: self._length = self.getLen = Length()
@@ -241,77 +264,83 @@
return result
def _move_item(self, k, current_ts, default=None):
- if self._timeout_slices:
+ if not self._timeout_slices:
+ # special case for no timeout value
+ bucket = self._data.get(0)
+ return bucket.get(k, default)
- if self._roll(current_ts, 'replentish'):
- self._replentish(current_ts)
+ # always call finalize
+ self._finalize(current_ts)
- if self._roll(current_ts, 'gc'):
- self._gc(current_ts)
+ # call gc and/or replentish on an only-as needed basis
+ if self._roll(current_ts, 'replentish'):
+ self._replentish(current_ts)
- STRICT and _assert(self._data.has_key(current_ts))
- current = self._getCurrentSlices(current_ts)
- found_ts = None
+ if self._roll(current_ts, 'gc'):
+ self._gc(current_ts)
- for ts in current:
- bucket = self._data.get(ts)
- DEBUG and TLOG(
- '_move_item: bucket for ts %s is %s' % (ts, id(bucket)))
- # dont use hasattr here (it hides conflict errors)
- if getattr(bucket, 'has_key', None):
- if DEBUG:
- keys = list(bucket.keys())
- DEBUG and TLOG(
- '_move_item: keys for ts %s (bucket %s)-- %s' %
- (ts, id(bucket), str(keys))
- )
- if bucket.has_key(k):
- found_ts = ts
- break
-
- DEBUG and TLOG('_move_item: found_ts is %s' % found_ts)
- if found_ts is None:
- DEBUG and TLOG('_move_item: returning default of %s' % default)
- return default
+ # SUBTLETY ALERTY TO SELF: do not "improve" the code below
+ # unnecessarily, as it will end only in tears. The lack of aliases
+ # and the ordering is intentional.
- bucket = self._data[found_ts]
- item = bucket[k]
+ STRICT and _assert(self._data.has_key(current_ts))
+ current_slices = self._getCurrentSlices(current_ts)
+ found_ts = None
- if current_ts != found_ts:
- DEBUG and TLOG(
- '_move_item: moving item %s from %s to %s' % (
- k, found_ts, current_ts))
- del bucket[k]
- if not issubclass(BUCKET_CLASS, Persistent):
- # tickle persistence machinery
- self._data[found_ts] = bucket
- DEBUG and TLOG(
- '_move_item: deleted key %s from bucket %s' % (
- k,id(bucket))
- )
- if DEBUG:
- keys = list(bucket.keys())
- DEBUG and TLOG(
- '_move_item: keys for found_ts %s (bucket %s): %s' % (
- found_ts, id(bucket), str(keys))
- )
- STRICT and _assert(bucket.get(k, None) is None)
- STRICT and _assert(not bucket.has_key(k))
- current_bucket = self._data[current_ts]
- current_bucket[k] = item
- if not issubclass(BUCKET_CLASS, Persistent):
- # tickle persistence machinery
- self._data[current_ts] = current_bucket
+ for ts in current_slices:
+ abucket = self._data.get(ts, None)
+ if abucket is None:
+ DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
+ continue
+ DEBUG and TLOG(
+ '_move_item: bucket for ts %s is %s' % (ts, id(abucket)))
+ DEBUG and TLOG(
+ '_move_item: keys for ts %s (bucket %s)-- %s' %
+ (ts, id(abucket), str(list(abucket.keys())))
+ )
+ # uhghost?
+ if abucket.get(k, None) is not None:
+ found_ts = ts
+ break
- else:
- # special case for no timeout value
- bucket = self._data.get(0)
- item = bucket.get(k, default)
+ DEBUG and TLOG('_move_item: found_ts is %s' % found_ts)
- # dont use hasattr here (it hides conflict errors)
- if getattr(item, 'setLastAccessed', None):
- item.setLastAccessed()
- return item
+ if found_ts is None:
+ DEBUG and TLOG('_move_item: returning default of %s' % default)
+ return default
+
+ if found_ts != current_ts:
+
+ DEBUG and TLOG('_move_item: current_ts (%s) != found_ts (%s), '
+ 'moving to current' % (current_ts, found_ts))
+ DEBUG and TLOG(
+ '_move_item: keys for found_ts %s (bucket %s): %s' % (
+ found_ts, id(self._data[found_ts]),
+ `list(self._data[found_ts].keys())`)
+ )
+ self._data[current_ts][k] = self._data[found_ts][k]
+ if not issubclass(BUCKET_CLASS, Persistent):
+ # tickle persistence machinery
+ self._data[current_ts] = self._data[current_ts]
+ DEBUG and TLOG(
+ '_move_item: copied item %s from %s to %s (bucket %s)' % (
+ k, found_ts, current_ts, id(self._data[current_ts])))
+ del self._data[found_ts][k]
+ if not issubclass(BUCKET_CLASS, Persistent):
+ # tickle persistence machinery
+ self._data[found_ts] = self._data[found_ts]
+ DEBUG and TLOG(
+ '_move_item: deleted item %s from ts %s (bucket %s)' % (
+ k, found_ts, id(self._data[found_ts]))
+ )
+ STRICT and _assert(self._data[found_ts].get(k, None) is None)
+ STRICT and _assert(not self._data[found_ts].has_key(k))
+
+ if getattr(self._data[current_ts][k], 'setLastAccessed', None):
+ self._data[current_ts][k].setLastAccessed()
+ DEBUG and TLOG('_move_item: returning %s from current_ts %s '
+ % (k, current_ts))
+ return self._data[current_ts][k]
def _all(self):
if self._timeout_slices:
@@ -319,6 +348,8 @@
else:
current_ts = 0
+ self._finalize(current_ts)
+
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
@@ -485,30 +516,126 @@
DEBUG and TLOG('_roll: %s rigged toss' % reason)
return True
else:
- # we're not in an emergency bucket shortage, so we can take
- # our chances during the roll. It's highly unlikely that two
- # threads will win the roll simultaneously, so we avoid a certain
- # class of conflicts here.
+ # we're not in an emergency bucket shortage, so we can
+ # take our chances during the roll. It's unlikely that
+ # two threads will win the roll simultaneously, so we
+ # avoid a certain class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: %s roll winner" % reason)
return True
DEBUG and TLOG("_roll: %s roll loser" % reason)
return False
+ def _get_max_expired_ts(self, now):
+ return now - (self._period * (self._timeout_slices + 1))
+
+ def _finalize(self, now):
+ if not self._timeout_slices:
+ DEBUG and TLOG('_finalize: doing nothing (no timeout)')
+ return # don't do any finalization if there is no timeout
+
+ # The nature of sessioning is that when the timeslice rolls
+ # over, all active threads will try to do a lot of work during
+ # finalization, all but one unnecessarily. We really don't
+ # want more than one thread at a time to try to finalize
+ # buckets at the same time so we try to lock. We give up if we
+ # can't lock immediately because it doesn't matter if we skip
+ # a couple of opportunities for finalization, as long as it
+ # gets done by some thread eventually. A similar pattern
+ # exists for _gc and _replentish.
+
+ if not self.finalize_lock.acquire(0):
+ DEBUG and TLOG('_finalize: couldnt acquire lock')
+ return
+
+ try:
+ DEBUG and TLOG('_finalize: lock acquired successfully')
+
+ if now is None:
+ now = getCurrentTimeslice(self._period) # for unit tests
+
+ # we want to start finalizing from one timeslice after the
+ # timeslice which we last finalized. Note that finalizing
+ # an already-finalized bucket somehow sends persistence
+ # into a spin with an exception later raised:
+ # "SystemError: error return without exception set",
+ # typically coming from
+ # Products.Sessions.SessionDataManager, line 182, in
+ # _getSessionDataObject (if getattr(ob, '__of__', None)
+ # and getattr(ob, 'aq_parent', None)). According to this
+ # email message from Jim, it may be because the ob is
+ # ghosted and doesn't have a _p_jar somehow:
+ #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
+
+ start_finalize = self._last_finalized_timeslice() + self._period
+
+ # we want to finalize only up to the maximum expired timeslice
+ max_ts = self._get_max_expired_ts(now)
+
+ if start_finalize >= max_ts:
+ DEBUG and TLOG(
+ '_finalize: start_finalize (%s) >= max_ts (%s), '
+ 'doing nothing' % (start_finalize, max_ts))
+ return
+
+ DEBUG and TLOG('_finalize: now is %s' % now)
+ DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
+ DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
+
+ to_finalize = list(self._data.keys(start_finalize, max_ts))
+ DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
+
+ delta = 0
+
+ for key in to_finalize:
+
+ assert(start_finalize <= key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ values = list(self._data[key].values())
+ DEBUG and TLOG('_finalize: values to notify from ts %s '
+ 'are %s' % (key, `list(values)`))
+
+ delta += len(values)
+
+ for v in values:
+ self.notifyDel(v)
+
+ if delta:
+ self._length.change(-delta)
+
+ DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
+ 'to max_ts of %s' % max_ts)
+
+ self._last_finalized_timeslice.set(max_ts)
+
+ finally:
+ self.finalize_lock.release()
+
def _replentish(self, now):
# available_spares == the number of "spare" buckets that exist in
# "_data"
if not self._timeout_slices:
return # do nothing if no timeout
- max_ts = self._max_timeslice()
- available_spares = (max_ts-now) / self._period
- DEBUG and TLOG('_replentish: now = %s' % now)
- DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
- DEBUG and TLOG('_replentish: available_spares = %s'
- % available_spares)
+ if not self.replentish_lock.acquire(0):
+ DEBUG and TLOG('_replentish: couldnt acquire lock')
+ return
+
+ try:
+ max_ts = self._max_timeslice()
+ available_spares = (max_ts-now) / self._period
+ DEBUG and TLOG('_replentish: now = %s' % now)
+ DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
+ DEBUG and TLOG('_replentish: available_spares = %s'
+ % available_spares)
+
+ if available_spares >= SPARE_BUCKETS:
+ DEBUG and TLOG('_replentish: available_spares (%s) >= '
+ 'SPARE_BUCKETS (%s), doing '
+ 'nothing'% (available_spares,
+ SPARE_BUCKETS))
+ return
- if available_spares < SPARE_BUCKETS:
if max_ts < now:
replentish_start = now
replentish_end = now + (self._period * SPARE_BUCKETS)
@@ -538,33 +665,35 @@
time.sleep(random.uniform(0, 1)) # add entropy
raise
self._max_timeslice.set(max(new_buckets))
+ finally:
+ self.replentish_lock.release()
def _gc(self, now=None):
if not self._timeout_slices:
return # dont do gc if there is no timeout
- if now is None:
- now = getCurrentTimeslice(self._period) # for unit tests
- max_ts = now - (self._period * (self._timeout_slices + 1))
-
- to_notify = []
-
- DEBUG and TLOG('_gc: now is %s' % now)
- DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
-
- for key in list(self._data.keys(None, max_ts)):
- assert(key <= max_ts)
- STRICT and _assert(self._data.has_key(key))
-
- for v in self._data[key].values():
- to_notify.append(v)
- self._length.change(-1)
-
- DEBUG and TLOG('deleting %s from _data' % key)
- del self._data[key]
+ if not self.gc_lock.acquire(0):
+ DEBUG and TLOG('_gc: couldnt acquire lock')
+ return
- for v in to_notify:
- self.notifyDel(v)
+ try:
+ if now is None:
+ now = getCurrentTimeslice(self._period) # for unit tests
+
+ # we want to garbage collect all buckets that have already been run
+ # through finalization
+ max_ts = self._last_finalized_timeslice()
+
+ DEBUG and TLOG('_gc: now is %s' % now)
+ DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
+
+ for key in list(self._data.keys(None, max_ts)):
+ assert(key <= max_ts)
+ STRICT and _assert(self._data.has_key(key))
+ DEBUG and TLOG('deleting %s from _data' % key)
+ del self._data[key]
+ finally:
+ self.gc_lock.release()
def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item)
@@ -758,6 +887,9 @@
self._data[i] = BUCKET_CLASS()
# create an Increaser for max timeslice
self._max_timeslice = Increaser(max(new_slices))
+
+ if not state.has_key('_last_finalized_timeslice'):
+ self._last_finalized_timeslice = Increaser(-self._period)
# we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
More information about the Zope-Checkins
mailing list