[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.9.2.2

Jeremy Hylton jeremy@zope.com
Mon, 3 Mar 2003 17:16:50 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv1511

Modified Files:
      Tag: jeremy-atomic-invalidation-branch
	connection.py 
Log Message:
Several improvements for invalidation locking.

Add comment explaining why we need to use a lock even though
dict update() should be atomic.

Remove modifiedInVersion() which is not used anywhere.

Refactor setstate() to do invalidation checking in helper methods.

Remove duplicate checking of _invalidation.  It was checked once in
_objcommit() and once in _commit_store().  Every object checked in
_objcommit() was also checked in _commit_store(), so move the check
there.



=== Zope3/src/zodb/connection.py 1.9.2.1 => 1.9.2.2 ===
--- Zope3/src/zodb/connection.py:1.9.2.1	Sat Mar  1 21:36:54 2003
+++ Zope3/src/zodb/connection.py	Mon Mar  3 17:16:49 2003
@@ -92,7 +92,16 @@
 
         # _invalidated queues invalidate messages delivered from the DB
         # _inv_lock prevents one thread from modifying the set while
-        # another is processing invalidations
+        # another is processing invalidations.  All the invalidations
+        # from a single transaction should be applied atomically, so
+        # the lock must be held when reading _invalidated.
+
+        # XXX It sucks that we have to hold the lock to read
+        # _invalidated.  Normally, _invalidated is written by call
+        # dict.update, which will execute atomically by virtue of the
+        # GIL.  But some storage might generate oids where hash or
+        # compare invokes Python code.  In that case, the GIL can't
+        # save us.
         self._inv_lock = threading.Lock()
         self._invalidated = Set()
         self._committed = []
@@ -125,12 +134,6 @@
             sync()
         self._flush_invalidations()
 
-    def modifiedInVersion(self, oid):
-        try:
-            return self._storage.modifiedInVersion(oid)
-        except KeyError:
-            return self._version
-
     def get(self, oid):
         # assume that a cache cannot store None as a valid object
         object = self._cache.get(oid)
@@ -158,13 +161,10 @@
     # setstate(), register(), mtime()
 
     def setstate(self, obj):
+        # extremely paranoid: guard against obj not having an _p_oid.
         oid = None
-        # XXX Is it possible to reorganize the method-level try/except?
         try:
             oid = obj._p_oid
-
-            # XXX this is quite conservative!
-
             # Avoid reading data from a transaction that committed
             # after the current transaction started, as that might
             # lead to mixing of cached data from earlier transactions
@@ -173,34 +173,11 @@
             # Wait for check until after data is loaded from storage
             # to avoid time-of-check to time-of-use race.
             p, serial = self._storage.load(oid, self._version)
-
-            if oid in self._invalidated:
-                if not (hasattr(obj, '_p_independent')
-                        and obj._p_independent()):
-                    get_transaction().join(self)
-                    self._conflicts.add(obj._p_oid)
-                    raise ReadConflictError(object=obj)
-                invalid = 1
-            else:
-                invalid = 0
-
+            invalid = self._is_invalidated(obj)
             self._reader.setGhostState(obj, p)
             obj._p_serial = serial
-
             if invalid:
-                if obj._p_independent():
-                    self._inv_lock.acquire()
-                    try:
-                        try:
-                            del self._invalidated[oid]
-                        except KeyError:
-                            pass
-                    finally:
-                        self._inv_lock.release()
-                else:
-                    get_transaction().join(self)
-                    raise ConflictError(object=obj)
-
+                self._handle_independent(obj)
         except ConflictError:
             raise
         except:
@@ -210,6 +187,44 @@
             # Add the object to the cache active list
             self._cache.setstate(oid, obj)
 
+    def _is_invalidated(self, obj):
+        # Helper method for setstate() covers three cases:
+        # returns false if obj is valid
+        # returns true if obj was invalidation, but is independent
+        # otherwise, raises ConflictError for invalidated objects
+        self._inv_lock.acquire()
+        try:
+            if obj._p_oid in self._invalidated:
+                # Defer _p_independent() call until state is loaded.
+                if hasattr(obj, "_p_independent"):
+                    return True
+                else:
+                    get_transaction().join(self)
+                    self._conflicts.add(obj._p_oid)
+                    raise ReadConflictError(object=obj)
+            else:
+                return False
+        finally:
+            self._inv_lock.release()
+
+    def _handle_independent(self, obj):
+        # Helper method for setstate() handles possibly independent objects
+        # Call _p_independent(), if it returns True, setstate() wins.
+        # Otherwise, raise a ConflictError.
+        
+        if obj._p_independent():
+            self._inv_lock.acquire()
+            try:
+                try:
+                    self._invalidated.remove(obj._p_oid)
+                except KeyError:
+                    pass
+            finally:
+                self._inv_lock.release()
+        else:
+            get_transaction().join(self)
+            raise ReadConflictError(object=obj)
+
     def register(self, object):
         txn = get_transaction()
         L = self._txns.get(txn)
@@ -223,7 +238,7 @@
         return None
 
     ######################################################################
-    # IConnection requires the next three methods:
+    # IConnection requires the next five methods:
     # getVersion(), reset(), cacheGC(), invalidate(), close()
 
     def getVersion(self):
@@ -236,10 +251,12 @@
             # version.
             self._cache.clear()
             self._version = version
-        # This method doesn't acquire the lock, so it shouldn't be called
-        # when the DB is delivering invalidations.
-        self._cache.invalidateMany(self._invalidated)
-        self._invalidated.clear()
+        self._inv_lock.acquire()
+        try:
+            self._cache.invalidateMany(self._invalidated)
+            self._invalidated.clear()
+        finally:
+            self._inv_lock.release()
         self._opened = time.time()
 
     def cacheGC(self):
@@ -252,6 +269,7 @@
         it.  The object data will be actually invalidated at certain
         transaction boundaries.
         """
+
         self._inv_lock.acquire()
         try:
             self._invalidated.update(oids)
@@ -420,9 +438,6 @@
             object._p_oid = oid
             self._created.add(oid)
         elif object._p_changed:
-            if (oid in self._invalidated and
-                not hasattr(object, '_p_resolveConflict')):
-                raise ConflictError(object=object)
             self._modified.add(oid)
         else:
             return # Nothing to do
@@ -437,10 +452,13 @@
         if serial is None:
             self._created.add(oid)
         else:
-            # XXX this seems to duplicate code on objcommit()
-            if (oid in self._invalidated and
-                not hasattr(pobject, '_p_resolveConflict')):
-                raise ConflictError(oid=oid)
+            self._inv_lock.acquire()
+            try:
+                if (oid in self._invalidated and
+                    not hasattr(pobject, '_p_resolveConflict')):
+                    raise ConflictError(oid=oid)
+            finally:
+                self._inv_lock.release()
             self._modified.add(oid)
 
         s = self._storage.store(oid, serial, writer.getState(pobject),