[Zope3-checkins] CVS: Zope3/lib/python/ZODB - Connection.py:1.90

Jeremy Hylton jeremy@zope.com
Wed, 4 Dec 2002 16:46:27 -0500


Update of /cvs-repository/Zope3/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv16871/lib/python/ZODB

Modified Files:
	Connection.py 
Log Message:
Fix race condition handling invalidation messages.

The _invalidated dict/set is passed to the cache.  If another thread
calls invalidate() while the cache is processing the invalidations,
the dictionary will raise a RuntimeError (dictionary changed size
during iteration).

It would be possible to fix the invalidateMany() problem without a
lock by passing a copy, e.g. the keys(), instead of the actual
dictionary, but there's a second race.  If another thread calls
invalidate() between the invalidateMany() call and the clear() call,
the invalidation will be lost.

The second race exists in ZODB3.  IIRC, Jim was aware of the race but
didn't think it was significant.  Still seems worth fixing in ZODB3.


=== Zope3/lib/python/ZODB/Connection.py 1.89 => 1.90 ===
--- Zope3/lib/python/ZODB/Connection.py:1.89	Tue Dec  3 12:24:12 2002
+++ Zope3/lib/python/ZODB/Connection.py	Wed Dec  4 16:46:27 2002
@@ -60,6 +60,7 @@
 import cPickle
 from cStringIO import StringIO
 import sys
+import threading
 import time
 from types import StringType, ClassType, TupleType
 
@@ -95,6 +96,9 @@
         self._reader = ConnectionObjectReader(self, self._cache)
 
         # _invalidated queues invalidate messages delivered from the DB
+        # _inv_lock prevents one thread from modifying the set while
+        # another is processing invalidations
+        self._inv_lock = threading.Lock()
         self._invalidated = Set()
         self._committed = []
         
@@ -174,10 +178,14 @@
 
             if invalid:
                 if object._p_independent():
+                    self._inv_lock.acquire()
                     try:
-                        del self._invalidated[oid]
-                    except KeyError:
-                        pass
+                        try:
+                            del self._invalidated[oid]
+                        except KeyError:
+                            pass
+                    finally:
+                        self._inv_lock.release()
                 else:
                     get_transaction().join(self)
                     raise ConflictError(object=object)
@@ -265,7 +273,21 @@
         transaction boundaries.
         """
         assert oid is not None
-        self._invalidated.add(oid)
+        self._inv_lock.acquire()
+        try:
+            self._invalidated.add(oid)
+        finally:
+            self._inv_lock.release()
+
+    def _flush_invalidations(self):
+        self._inv_lock.acquire()
+        try:
+            self._cache.invalidateMany(self._invalidated)
+            self._invalidated.clear()
+        finally:
+            self._inv_lock.release()
+        # Now is a good time to collect some garbage
+        self._cache.incrgc()
 
     def objcommit(self, object, transaction):
         oid = object._p_oid
@@ -375,6 +397,7 @@
             self.objcommit(obj, txn)
         self.importHook(txn) # hook for ExportImport
 
+        # The tpc_finish() of TmpStore returns an UndoInfo object.
         undo = self._storage.tpc_finish(txn)
         self._storage._created = self._created
         self._created = Set()
@@ -391,12 +414,10 @@
         if objs is not None:
             self._cache.invalidateMany([obj._p_oid for obj in objs])
             del self._txns[txn]
-        self._cache.invalidateMany(self._invalidated)
+        self._flush_invalidations()
         self._cache.invalidateMany(self._modified)
         self._invalidate_created(self._created)
         self._created = Set()
-
-        self._invalidated.clear()
         self._modified.clear()
 
     def prepare(self, txn):
@@ -465,15 +486,16 @@
         # invalidation message to all of the other connections!
 
         self._db.begin_invalidation()
+        # XXX We should really have a try/finally because the begin
+        # call acquired a lock that will only be released in
+        # _invalidate_modified().
         self._storage.tpc_finish(txn, self._invalidate_modified)
         try:
             del self._txns[txn]
         except KeyError:
             pass
 
-        self._cache.invalidateMany(self._invalidated)
-        self._invalidated.clear()
-        self._cache.incrgc() 
+        self._flush_invalidations()
 
     def _invalidate_modified(self):
         for oid in self._modified:
@@ -481,14 +503,12 @@
         self._db.finish_invalidation()
 
     def sync(self):
+        # XXX Is it safe to abort things right now?
         get_transaction().abort()
         sync = getattr(self._storage, 'sync', None)
         if sync is not None:
             sync()
-        # XXX race condition?
-        self._cache.invalidateMany(self._invalidated)
-        self._invalidated.clear()
-        self._cache.incrgc() # This is a good time to do some GC
+        self._flush_invalidations()
         
 class Rollback:
     """Rollback changes associated with savepoint"""