[Zope-Checkins] CVS: Zope2 - Connection.py:1.54 DB.py:1.30
jeremy@digicool.com
jeremy@digicool.com
Mon, 21 May 2001 18:45:39 -0400 (EDT)
Update of /cvs-repository/Zope2/lib/python/ZODB
In directory korak.digicool.com:/tmp/cvs-serv4229/ZODB
Modified Files:
Connection.py DB.py
Log Message:
Fix deadlock problem reported by John D. Heintz.
If one thread was committing a transaction and another thread was
opening a new DB connection, deadlock could occur. The cause of the
deadlock is that tpc_finish() acquires the storage and db locks in a
different order than DB.open(). As a result, if each starts at the
same time and gets one of the two locks it needs, the system will be
deadlocked.
The solution is to enforce a consistent locking order. If a thread is
going to hold the DB lock and the storage lock, it MUST acquire the DB
lock first. This patch implements that locking order for the
invalidation in tpc_finish().
The DB object gets methods called begin_invalidation() and
finish_invalidation() that acquire and release the DB lock
respectively. Before the Connection calls tpc_finish() on the
storage, it calls begin_invalidation(). This guarantees that the DB
acquired before the storage lock.
When the invalidation phase is over, the Connection calls
end_invalidation() to release the DB lock. This is an optimization.
It could wait until tpc_finish() returns, but we know that the DB will
not be used again for the rest of the tpc_finish() and tpc_finish()
could take a long time.
Specific changes:
DB.py
begin_invalidation(): Added.
finish_invalidation(): Added.
invalidate(): Remove locking.
invalidateMany(): Add comment about how it should be used.
Connection.py
tpc_finish(): Don't pass second argument to storage's tpc_finish()
when committing a transaction with no data. Add call to
begin_invalidation() before calling storage's tpc_finish().
_invalidate_sub(): Remove empty, unnecessary method.
_invalidating_invalidating(): Add call to finish_invalidation() after
last call to DB's invalidate().
--- Updated File Connection.py in package Zope2/lib/python/ZODB --
--- Connection.py 2001/05/17 18:35:10 1.53
+++ Connection.py 2001/05/21 22:45:38 1.54
@@ -683,18 +683,19 @@
def tpc_finish(self, transaction):
# It's important that the storage call the function we pass
- # (self.tpc_finish_) while it still has it's lock. We don't
- # want another thread to be able to read any updated data
- # until we've had a chance to send an invalidation message to
- # all of the other connections!
+ # (self._invalidate_invalidating) while it still has it's
+ # lock. We don't want another thread to be able to read any
+ # updated data until we've had a chance to send an
+ # invalidation message to all of the other connections!
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
- self._storage.tpc_finish(transaction, self._invalidate_sub)
+ self._storage.tpc_finish(transaction)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
+ self._db.begin_invalidation()
self._storage.tpc_finish(transaction,
self._invalidate_invalidating)
@@ -703,13 +704,9 @@
def _invalidate_invalidating(self):
invalidate=self._db.invalidate
- for oid in self._invalidating: invalidate(oid, self)
-
- def _invalidate_sub(self):
- # There's no point in invalidating any objects in a subtransaction
- #
- # Because we may ultimately abort the containing transaction.
- pass
+ for oid in self._invalidating:
+ invalidate(oid, self)
+ self._db.finish_invalidation()
def sync(self):
get_transaction().abort()
--- Updated File DB.py in package Zope2/lib/python/ZODB --
--- DB.py 2001/05/17 18:35:10 1.29
+++ DB.py 2001/05/21 22:45:38 1.30
@@ -311,6 +311,16 @@
def importFile(self, file):
raise 'Not yet implemented'
+ def begin_invalidation(self):
+ # Must be called before first call to invalidate and before
+ # the storage lock is held.
+ self._a()
+
+ def finish_invalidation(self):
+ # Must be called after begin_invalidation() and after final
+ # invalidate() call.
+ self._r()
+
def invalidate(self, oid, connection=None, version='',
rc=sys.getrefcount):
"""Invalidate references to a given oid.
@@ -320,40 +330,37 @@
passed in to prevent useless (but harmless) messages to the
connection.
"""
- if connection is not None: version=connection._version
- self._a()
- try:
+ if connection is not None:
+ version=connection._version
+ # Update modified in version cache
+ h=hash(oid)%131
+ o=self._miv_cache.get(h, None)
+ if o is not None and o[0]==oid: del self._miv_cache[h]
+
+ # Notify connections
+ for pool, allocated in self._pools[1]:
+ for cc in allocated:
+ if (cc is not connection and
+ (not version or cc._version==version)):
+ if rc(cc) <= 3:
+ cc.close()
+ cc.invalidate(oid)
- # Update modified in version cache
- h=hash(oid)%131
- cache=self._miv_cache
- o=cache.get(h, None)
- if o and o[0]==oid: del cache[h]
-
- # Notify connections
- pools,pooll=self._pools
- for pool, allocated in pooll:
- for cc in allocated:
+ temps=self._temps
+ if temps:
+ t=[]
+ for cc in temps:
+ if rc(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
- if rc(cc) <= 3:
- cc.close()
cc.invalidate(oid)
-
- temps=self._temps
- if temps:
- t=[]
- for cc in temps:
- if rc(cc) > 3:
- if (cc is not connection and
- (not version or cc._version==version)):
- cc.invalidate(oid)
- t.append(cc)
- else: cc.close()
- self._temps=t
- finally: self._r()
+ t.append(cc)
+ else: cc.close()
+ self._temps=t
def invalidateMany(self, oids=None, version=''):
+ # XXX Callers of this method need to call begin_invalidation()
+ # and finish_invalidation() to get the right locking
if oids is None: self.invalidate(None, version=version)
else:
for oid in oids: self.invalidate(oid, version=version)
--- Updated File Connection.py in package Zope2 --
--- Connection.py 2001/05/17 18:35:10 1.53
+++ Connection.py 2001/05/21 22:45:38 1.54
@@ -683,18 +683,19 @@
def tpc_finish(self, transaction):
# It's important that the storage call the function we pass
- # (self.tpc_finish_) while it still has it's lock. We don't
- # want another thread to be able to read any updated data
- # until we've had a chance to send an invalidation message to
- # all of the other connections!
+ # (self._invalidate_invalidating) while it still has it's
+ # lock. We don't want another thread to be able to read any
+ # updated data until we've had a chance to send an
+ # invalidation message to all of the other connections!
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
- self._storage.tpc_finish(transaction, self._invalidate_sub)
+ self._storage.tpc_finish(transaction)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
+ self._db.begin_invalidation()
self._storage.tpc_finish(transaction,
self._invalidate_invalidating)
@@ -703,13 +704,9 @@
def _invalidate_invalidating(self):
invalidate=self._db.invalidate
- for oid in self._invalidating: invalidate(oid, self)
-
- def _invalidate_sub(self):
- # There's no point in invalidating any objects in a subtransaction
- #
- # Because we may ultimately abort the containing transaction.
- pass
+ for oid in self._invalidating:
+ invalidate(oid, self)
+ self._db.finish_invalidation()
def sync(self):
get_transaction().abort()
--- Updated File DB.py in package Zope2 --
--- DB.py 2001/05/17 18:35:10 1.29
+++ DB.py 2001/05/21 22:45:38 1.30
@@ -311,6 +311,16 @@
def importFile(self, file):
raise 'Not yet implemented'
+ def begin_invalidation(self):
+ # Must be called before first call to invalidate and before
+ # the storage lock is held.
+ self._a()
+
+ def finish_invalidation(self):
+ # Must be called after begin_invalidation() and after final
+ # invalidate() call.
+ self._r()
+
def invalidate(self, oid, connection=None, version='',
rc=sys.getrefcount):
"""Invalidate references to a given oid.
@@ -320,40 +330,37 @@
passed in to prevent useless (but harmless) messages to the
connection.
"""
- if connection is not None: version=connection._version
- self._a()
- try:
+ if connection is not None:
+ version=connection._version
+ # Update modified in version cache
+ h=hash(oid)%131
+ o=self._miv_cache.get(h, None)
+ if o is not None and o[0]==oid: del self._miv_cache[h]
+
+ # Notify connections
+ for pool, allocated in self._pools[1]:
+ for cc in allocated:
+ if (cc is not connection and
+ (not version or cc._version==version)):
+ if rc(cc) <= 3:
+ cc.close()
+ cc.invalidate(oid)
- # Update modified in version cache
- h=hash(oid)%131
- cache=self._miv_cache
- o=cache.get(h, None)
- if o and o[0]==oid: del cache[h]
-
- # Notify connections
- pools,pooll=self._pools
- for pool, allocated in pooll:
- for cc in allocated:
+ temps=self._temps
+ if temps:
+ t=[]
+ for cc in temps:
+ if rc(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
- if rc(cc) <= 3:
- cc.close()
cc.invalidate(oid)
-
- temps=self._temps
- if temps:
- t=[]
- for cc in temps:
- if rc(cc) > 3:
- if (cc is not connection and
- (not version or cc._version==version)):
- cc.invalidate(oid)
- t.append(cc)
- else: cc.close()
- self._temps=t
- finally: self._r()
+ t.append(cc)
+ else: cc.close()
+ self._temps=t
def invalidateMany(self, oids=None, version=''):
+ # XXX Callers of this method need to call begin_invalidation()
+ # and finish_invalidation() to get the right locking
if oids is None: self.invalidate(None, version=version)
else:
for oid in oids: self.invalidate(oid, version=version)