[Zope-Checkins] CVS: Zope2 - Connection.py:1.54 DB.py:1.30

jeremy@digicool.com jeremy@digicool.com
Mon, 21 May 2001 18:45:39 -0400 (EDT)


Update of /cvs-repository/Zope2/lib/python/ZODB
In directory korak.digicool.com:/tmp/cvs-serv4229/ZODB

Modified Files:
	Connection.py DB.py 
Log Message:
Fix deadlock problem reported by John D. Heintz.

If one thread was committing a transaction and another thread was
opening a new DB connection, deadlock could occur.  The cause of the
deadlock is that tpc_finish() acquires the storage and db locks in a
different order than DB.open().  As a result, if each starts at the
same time and gets one of the two locks it needs, the system will be
deadlocked.

The solution is to enforce a consistent locking order.  If a thread is
going to hold the DB lock and the storage lock, it MUST acquire the DB
lock first.  This patch implements that locking order for the
invalidation in tpc_finish(). 

The DB object gets methods called begin_invalidation() and
finish_invalidation() that acquire and release the DB lock
respectively.  Before the Connection calls tpc_finish() on the
storage, it calls begin_invalidation().  This guarantees that the DB
acquired before the storage lock.

When the invalidation phase is over, the Connection calls
end_invalidation() to release the DB lock.  This is an optimization.
It could wait until tpc_finish() returns, but we know that the DB will
not be used again for the rest of the tpc_finish() and tpc_finish()
could take a long time.

Specific changes:

DB.py
begin_invalidation(): Added.
finish_invalidation(): Added.
invalidate(): Remove locking.
invalidateMany(): Add comment about how it should be used.

Connection.py
tpc_finish(): Don't pass second argument to storage's tpc_finish()
    when committing a transaction with no data.  Add call to
    begin_invalidation() before calling storage's tpc_finish().
_invalidate_sub(): Remove empty, unnecessary method.
_invalidating_invalidating(): Add call to finish_invalidation() after
    last call to DB's invalidate().






--- Updated File Connection.py in package Zope2/lib/python/ZODB --
--- Connection.py	2001/05/17 18:35:10	1.53
+++ Connection.py	2001/05/21 22:45:38	1.54
@@ -683,18 +683,19 @@
     def tpc_finish(self, transaction):
 
         # It's important that the storage call the function we pass
-        # (self.tpc_finish_) while it still has it's lock.  We don't
-        # want another thread to be able to read any updated data
-        # until we've had a chance to send an invalidation message to
-        # all of the other connections!
+        # (self._invalidate_invalidating) while it still has it's
+        # lock.  We don't want another thread to be able to read any
+        # updated data until we've had a chance to send an
+        # invalidation message to all of the other connections!
 
         if self._tmp is not None:
             # Commiting a subtransaction!
             # There is no need to invalidate anything.
-            self._storage.tpc_finish(transaction, self._invalidate_sub)
+            self._storage.tpc_finish(transaction)
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
+            self._db.begin_invalidation()
             self._storage.tpc_finish(transaction,
                                      self._invalidate_invalidating)
 
@@ -703,13 +704,9 @@
 
     def _invalidate_invalidating(self):
         invalidate=self._db.invalidate
-        for oid in self._invalidating: invalidate(oid, self)
-
-    def _invalidate_sub(self):
-        # There's no point in invalidating any objects in a subtransaction
-        #
-        # Because we may ultimately abort the containing transaction.
-        pass
+        for oid in self._invalidating:
+            invalidate(oid, self)
+        self._db.finish_invalidation()
 
     def sync(self):
         get_transaction().abort()

--- Updated File DB.py in package Zope2/lib/python/ZODB --
--- DB.py	2001/05/17 18:35:10	1.29
+++ DB.py	2001/05/21 22:45:38	1.30
@@ -311,6 +311,16 @@
     def importFile(self, file):
         raise 'Not yet implemented'
 
+    def begin_invalidation(self):
+        # Must be called before first call to invalidate and before
+        # the storage lock is held.
+        self._a()
+
+    def finish_invalidation(self):
+        # Must be called after begin_invalidation() and after final
+        # invalidate() call.
+        self._r()
+
     def invalidate(self, oid, connection=None, version='',
                    rc=sys.getrefcount):
         """Invalidate references to a given oid.
@@ -320,40 +330,37 @@
         passed in to prevent useless (but harmless) messages to the
         connection.
         """
-        if connection is not None: version=connection._version
-        self._a()
-        try:
+        if connection is not None:
+            version=connection._version
+        # Update modified in version cache
+        h=hash(oid)%131
+        o=self._miv_cache.get(h, None)
+        if o is not None and o[0]==oid: del self._miv_cache[h]
+
+        # Notify connections
+        for pool, allocated in self._pools[1]:
+            for cc in allocated:
+                if (cc is not connection and
+                    (not version or cc._version==version)):
+                    if rc(cc) <= 3:
+                        cc.close()
+                    cc.invalidate(oid)
 
-            # Update modified in version cache
-            h=hash(oid)%131
-            cache=self._miv_cache
-            o=cache.get(h, None)
-            if o and o[0]==oid: del cache[h]
-
-            # Notify connections
-            pools,pooll=self._pools
-            for pool, allocated in pooll:
-                for cc in allocated:
+        temps=self._temps
+        if temps:
+            t=[]
+            for cc in temps:
+                if rc(cc) > 3:
                     if (cc is not connection and
                         (not version or cc._version==version)):
-                        if rc(cc) <= 3:
-                            cc.close()
                         cc.invalidate(oid)
-
-            temps=self._temps
-            if temps:
-                t=[]
-                for cc in temps:
-                    if rc(cc) > 3:
-                        if (cc is not connection and
-                            (not version or cc._version==version)):
-                            cc.invalidate(oid)
-                        t.append(cc)
-                    else: cc.close()
-                self._temps=t
-        finally: self._r()
+                    t.append(cc)
+                else: cc.close()
+            self._temps=t
 
     def invalidateMany(self, oids=None, version=''):
+        # XXX Callers of this method need to call begin_invalidation()
+        # and finish_invalidation() to get the right locking 
         if oids is None: self.invalidate(None, version=version)
         else:
             for oid in oids: self.invalidate(oid, version=version)



--- Updated File Connection.py in package Zope2 --
--- Connection.py	2001/05/17 18:35:10	1.53
+++ Connection.py	2001/05/21 22:45:38	1.54
@@ -683,18 +683,19 @@
     def tpc_finish(self, transaction):
 
         # It's important that the storage call the function we pass
-        # (self.tpc_finish_) while it still has it's lock.  We don't
-        # want another thread to be able to read any updated data
-        # until we've had a chance to send an invalidation message to
-        # all of the other connections!
+        # (self._invalidate_invalidating) while it still has it's
+        # lock.  We don't want another thread to be able to read any
+        # updated data until we've had a chance to send an
+        # invalidation message to all of the other connections!
 
         if self._tmp is not None:
             # Commiting a subtransaction!
             # There is no need to invalidate anything.
-            self._storage.tpc_finish(transaction, self._invalidate_sub)
+            self._storage.tpc_finish(transaction)
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
+            self._db.begin_invalidation()
             self._storage.tpc_finish(transaction,
                                      self._invalidate_invalidating)
 
@@ -703,13 +704,9 @@
 
     def _invalidate_invalidating(self):
         invalidate=self._db.invalidate
-        for oid in self._invalidating: invalidate(oid, self)
-
-    def _invalidate_sub(self):
-        # There's no point in invalidating any objects in a subtransaction
-        #
-        # Because we may ultimately abort the containing transaction.
-        pass
+        for oid in self._invalidating:
+            invalidate(oid, self)
+        self._db.finish_invalidation()
 
     def sync(self):
         get_transaction().abort()

--- Updated File DB.py in package Zope2 --
--- DB.py	2001/05/17 18:35:10	1.29
+++ DB.py	2001/05/21 22:45:38	1.30
@@ -311,6 +311,16 @@
     def importFile(self, file):
         raise 'Not yet implemented'
 
+    def begin_invalidation(self):
+        # Must be called before first call to invalidate and before
+        # the storage lock is held.
+        self._a()
+
+    def finish_invalidation(self):
+        # Must be called after begin_invalidation() and after final
+        # invalidate() call.
+        self._r()
+
     def invalidate(self, oid, connection=None, version='',
                    rc=sys.getrefcount):
         """Invalidate references to a given oid.
@@ -320,40 +330,37 @@
         passed in to prevent useless (but harmless) messages to the
         connection.
         """
-        if connection is not None: version=connection._version
-        self._a()
-        try:
+        if connection is not None:
+            version=connection._version
+        # Update modified in version cache
+        h=hash(oid)%131
+        o=self._miv_cache.get(h, None)
+        if o is not None and o[0]==oid: del self._miv_cache[h]
+
+        # Notify connections
+        for pool, allocated in self._pools[1]:
+            for cc in allocated:
+                if (cc is not connection and
+                    (not version or cc._version==version)):
+                    if rc(cc) <= 3:
+                        cc.close()
+                    cc.invalidate(oid)
 
-            # Update modified in version cache
-            h=hash(oid)%131
-            cache=self._miv_cache
-            o=cache.get(h, None)
-            if o and o[0]==oid: del cache[h]
-
-            # Notify connections
-            pools,pooll=self._pools
-            for pool, allocated in pooll:
-                for cc in allocated:
+        temps=self._temps
+        if temps:
+            t=[]
+            for cc in temps:
+                if rc(cc) > 3:
                     if (cc is not connection and
                         (not version or cc._version==version)):
-                        if rc(cc) <= 3:
-                            cc.close()
                         cc.invalidate(oid)
-
-            temps=self._temps
-            if temps:
-                t=[]
-                for cc in temps:
-                    if rc(cc) > 3:
-                        if (cc is not connection and
-                            (not version or cc._version==version)):
-                            cc.invalidate(oid)
-                        t.append(cc)
-                    else: cc.close()
-                self._temps=t
-        finally: self._r()
+                    t.append(cc)
+                else: cc.close()
+            self._temps=t
 
     def invalidateMany(self, oids=None, version=''):
+        # XXX Callers of this method need to call begin_invalidation()
+        # and finish_invalidation() to get the right locking 
         if oids is None: self.invalidate(None, version=version)
         else:
             for oid in oids: self.invalidate(oid, version=version)