[Zodb-checkins] CVS: Zope3/lib/python/ZODB - Connection.py:1.60.6.10 DB.py:1.34.4.6 TmpStore.py:1.4.86.2
Jeremy Hylton
jeremy@zope.com
Mon, 11 Mar 2002 23:53:44 -0500
Update of /cvs-repository/Zope3/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv23618
Modified Files:
Tag: Zope-3x-branch
Connection.py DB.py TmpStore.py
Log Message:
Refactor invalidation logic.
The old code contained instance variables with similar names and uses:
_invalidated and _invalidating. I've renamed _invalidating _modified.
The _invalidated dictionary implements a set of oids. It contains all
of the oids modified by other connections since the current
transaction began. When the transaction finishes, it updates the
cache based on _invalidated and resets _invalidated. (There may be a
race condition in the update-and-then-clear.)
The _modified instance variable is a list of oids modified by the
current transaction. It is reset each time tpc_begin() is called.
Since _invalidated and _modified are past tense, rename _creating to
_created. (This also effects TmpStore.)
There are many tests in the code using _invalid(). This is equivalent
to self._invalidated.has_key(). Many of these tests check the current
oid and also check None, e.g. self._invalid(None). It appears,
however, that it is impossible for None to be stored in _invalidated.
Add an assertion to this effect in the code and delete tests for
_invalid(None).
Refactor implementation of DB.invalidate() into several smaller
methods.
Fix typos in tpc_abort() reported by Steve Alexander and R. David Murray.
Rename Connection _setDB() method to registerDB().
=== Zope3/lib/python/ZODB/Connection.py 1.60.6.9 => 1.60.6.10 ===
self._cache = cache = Cache(cache_size, cache_deactivate_after)
self.cacheGC = cache.incrgc
-
+
+ # _invalidated queues invalidate messages delivered from the DB
self._invalidated = d = {}
self._invalid = d.has_key
self._committed = []
@@ -238,8 +239,7 @@
# to avoid time-of-check to time-of-use race.
p, serial = self._storage.load(oid, self._version)
- # XXX What does _invalid(None) mean?
- if self._invalid(oid) or self._invalid(None):
+ if self._invalid(oid):
if not (hasattr(object, '_p_independent')
and object._p_independent()):
get_transaction().register(self)
@@ -295,6 +295,7 @@
# New code is in place. Start a new cache.
self._resetCache()
else:
+ # XXX race condition?
self._cache.invalidateMany(self._invalidated.iterkeys())
self._invalidated.clear()
self._opened = time.time()
@@ -361,6 +362,9 @@
def cacheMinimize(self, dt=0):
self._cache.minimize(dt)
+ def getVersion(self):
+ return self._version
+
def invalidate(self, oid):
"""Invalidate a particular oid
@@ -368,7 +372,8 @@
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
- # XXX can oid be None?
+ assert oid is not None
+ # XXX race condition?
self._invalidated[oid] = 1
######################################################################
@@ -380,6 +385,7 @@
def abort(self, object, transaction):
"""Invalidate the object (or all objects if None)."""
if object is self:
+ # XXX race condition?
self._cache.invalidateMany(self._invalidated.iterkeys())
self._invalidated.clear()
else:
@@ -400,14 +406,13 @@
oid = self.new_oid()
object._p_jar = self
object._p_oid = oid
- self._creating.append(oid)
+ self._created.append(oid)
elif object._p_changed:
# XXX Is it kosher to raise a ConflictError on commit?
- if ((self._invalid(oid)
- and not hasattr(object, '_p_resolveConflict'))
- or self._invalid(None)):
+ if (self._invalid(oid)
+ and not hasattr(object, '_p_resolveConflict')):
raise ConflictError(object=object)
- self._invalidating.append(oid)
+ self._modified.append(oid)
else:
return # Nothing to do
@@ -428,7 +433,7 @@
serial = getattr(pobject, '_p_serial', '\0\0\0\0\0\0\0\0')
if serial == '\0\0\0\0\0\0\0\0':
# new object
- self._creating.append(oid)
+ self._created.append(oid)
else:
#XXX We should never get here
#jer: Don't understand previous comment.
@@ -436,7 +441,7 @@
not hasattr(pobject, '_p_resolveConflict'))
or self._invalidated.has_key(None)):
raise ConflictError(oid=oid, serial=serial)
- self._invalidating.append(oid)
+ self._modified.append(oid)
klass = pobject.__class__
@@ -497,8 +502,8 @@
oids = src._index.keys()
# Copy invalidating and creating info from temporary storage:
- self._invalidating.extend(oids)
- self._creating.extend(src._creating)
+ self._modified.extend(oids)
+ self._created.extend(src._created)
for oid in oids:
data, serial = src.load(oid, src)
@@ -515,16 +520,16 @@
self._cache.invalidateMany(src._index.iterkeys())
src._index.clear()
- self._invalidate_creating(src._creating)
+ self._invalidate_created(src._created)
- def _invalidate_creating(self, creating=None):
+ def _invalidate_created(self, created=None):
"""Dissown any objects newly saved in an uncommitted transaction.
"""
- if creating is None:
- creating = self._creating
- self._creating = []
+ if created is None:
+ created = self._created
+ self._created = []
- for oid in creating:
+ for oid in created:
o = self._cache.get(oid)
if o is not None:
del o._p_jar
@@ -543,17 +548,18 @@
if self.__onCommitActions is not None:
del self.__onCommitActions
self._storage.tpc_abort(transaction)
+ # XXX race condition?
self._cache.invalidateMany(self._invalidated.iterkeys())
self._invalidated.clear()
- self._cache.invalidateMany(self._invalidating.iterkeys())
- self._invalidatng.clear()
- self._invalidate_creating()
+ self._cache.invalidateMany(self._modified)
+ del self._modified[:]
+ self._invalidate_created()
def tpc_begin(self, transaction, sub=None):
- if self._invalid(None): # Some nitwit invalidated everything!
- raise ConflictError("transaction already invalidated")
- self._invalidating = []
- self._creating = []
+ # _modified is a list of the oids of the objects modified
+ # by this transaction.
+ self._modified = []
+ self._created = []
if sub:
# Sub-transaction!
@@ -621,7 +627,7 @@
def tpc_finish(self, transaction):
# It's important that the storage call the function we pass
- # (self._invalidate_invalidating) while it still has it's
+ # (self._invalidate_modified) while it still has it's
# lock. We don't want another thread to be able to read any
# updated data until we've had a chance to send an
# invalidation message to all of the other connections!
@@ -630,19 +636,20 @@
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction)
- self._storage._creating[:0] = self._creating
- del self._creating[:]
+ self._storage._created[:0] = self._created
+ del self._created[:]
else:
self._db.begin_invalidation()
self._storage.tpc_finish(transaction,
- self._invalidate_invalidating)
+ self._invalidate_modified)
+ # XXX race condition?
self._cache.invalidateMany(self._invalidated.iterkeys())
self._invalidated.clear()
self.cacheGC() # This is a good time to do some GC
- def _invalidate_invalidating(self):
- for oid in self._invalidating:
+ def _invalidate_modified(self):
+ for oid in self._modified:
self._db.invalidate(oid, self)
self._db.finish_invalidation()
@@ -651,6 +658,7 @@
sync = getattr(self._storage, 'sync', None)
if sync is not None:
sync()
+ # XXX race condition?
self._cache.invalidateMany(self._invalidated.iterkeys())
self._invalidated.clear()
self.cacheGC() # This is a good time to do some GC
=== Zope3/lib/python/ZODB/DB.py 1.34.4.5 => 1.34.4.6 ===
self._r()
- def invalidate(self, oid, connection=None, version='',
- rc=sys.getrefcount):
+ def invalidate(self, oid, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
@@ -274,32 +273,53 @@
connection.
"""
if connection is not None:
- version=connection._version
- # Update modified in version cache
- h=hash(oid)%131
- o=self._miv_cache.get(h, None)
- if o is not None and o[0]==oid: del self._miv_cache[h]
+ version = connection._version
+
+ assert oid is not None
+
+ self.updateMIVCache(oid)
# Notify connections
for pool, allocated in self._pools[1]:
for cc in allocated:
- if (cc is not connection and
- (not version or cc._version==version)):
- if rc(cc) <= 3:
- cc.close()
- cc.invalidate(oid)
-
- temps=self._temps
- if temps:
- t=[]
- for cc in temps:
- if rc(cc) > 3:
- if (cc is not connection and
- (not version or cc._version==version)):
- cc.invalidate(oid)
- t.append(cc)
- else: cc.close()
- self._temps=t
+ if cc is not connection:
+ self.invalidateConnection(cc, version, oid)
+
+ if self._temps:
+ # t accumulates all the connections that aren't closed.
+ t = []
+ for cc in self._temps:
+ if cc is not connection:
+ self.invalidateConnection(cc, oid, version,
+ t.append)
+ self._temps = t
+
+ def invalidateConnection(self, conn, oid, version, alive=None):
+ """Send invalidation message to conn for oid on version.
+
+ If the modification occurred on a version, an invalidation is
+ sent only if the version of the mod matches the version of the
+ connection.
+
+ This function also handles garbage collection of connection's
+ that aren't used anymore. If the optional argument alive is
+ defined, it is a function that is called for all connections
+ that aren't garbage collected.
+ """
+
+ if sys.getrefcount(conn) <= 3:
+ conn.close()
+ else:
+ if alive is not None:
+ alive(conn)
+ if not version or conn.getVersion() == version:
+ conn.invalidate(oid)
+
+ def updateMIVCache(self, oid):
+ h = hash(oid) % 131
+ o = self._miv_cache.get(h)
+ if o is not None and o[0]==oid:
+ del self._miv_cache[h]
def modifiedInVersion(self, oid):
h=hash(oid)%131
=== Zope3/lib/python/ZODB/TmpStore.py 1.4.86.1 => 1.4.86.2 ===
file=tempfile.TemporaryFile()
- self._file=file
- self._index={}
- self._pos=self._tpos=0
- self._bver=base_version
- self._tindex=[]
- self._db=None
- self._creating=[]
+ self._file = file
+ self._index = {}
+ self._pos = self._tpos = 0
+ self._bver = base_version
+ self._tindex = []
+ self._db = None
+ self._created = []
- def __del__(self): self.close()
+ def __del__(self):
+ self.close()
def close(self):
self._file.close()
@@ -39,8 +40,11 @@
del self._index
del self._db
- def getName(self): return self._db.getName()
- def getSize(self): return self._pos
+ def getName(self):
+ return self._db.getName()
+
+ def getSize(self):
+ return self._pos
def load(self, oid, version):
#if version is not self: raise KeyError, oid
@@ -55,14 +59,16 @@
return file.read(u64(h[16:])), h[8:16]
def modifiedInVersion(self, oid):
- if self._index.has_key(oid): return 1
+ if self._index.has_key(oid):
+ return 1
return self._db._storage.modifiedInVersion(oid)
- def new_oid(self): return self._db._storage.new_oid()
+ def new_oid(self):
+ return self._db._storage.new_oid()
def registerDB(self, db, limit):
- self._db=db
- self._storage=db._storage
+ self._db = db
+ self._storage = db._storage
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: