[Zodb-checkins] CVS: ZEO/ZEO - ClientStorage.py:1.35.6.4.2.6
Barry Warsaw
barry@wooz.org
Wed, 15 May 2002 19:47:49 -0400
Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv11828
Modified Files:
Tag: ZEO2-branch
ClientStorage.py
Log Message:
Updates and additions to the threading tests. We specifically want to
make sure:
- that if a storage is closed in one thread, while another thread is
in the middle of a transaction, the second thread will get an
exception when it tries to tpc_finish();
- that if one thread begins a transaction, and a second thread starts
a transaction, the second will block in the tpc_begin() for a while,
getting an exception when a third thread closes the storage;
- that in the situation above, the second thread, which never
successfully got through its tpc_begin() does not get the
transaction lock (I think we're only moderately successful in
testing this situation);
These tests and the accompanying fixes to ClientStorage should knock
off the annoying "AssertionError: notify() of un-acquire()d lock"
errors in the ZEO2 branch.
Changes to ClientStorage include:
- notifyDisconnected(): Don't do the notifyAll(), release() or setting
of self._transaction to None. This is now the responsibility of
tpc_finish() and tpc_abort().
- tpc_abort(), tpc_finish(): Wrap the core logic in a try/finally
which does the proper cleanup that notifyDisconnected() used to do.
- _update_cache(): Catch the ValueError that can happen in the
begin_iterate() call if the backing storage has already been
closed. Transform this into a ClientStorageError, just like the
try/except in the body of the "while 1:" was doing.
=== ZEO/ZEO/ClientStorage.py 1.35.6.4.2.5 => 1.35.6.4.2.6 ===
log2(PROBLEM, "Disconnected from storage")
self._server = disconnected_stub
- if self._transaction:
- self._transaction = None
- self.tpc_cond.notifyAll()
- self.tpc_cond.release()
def __len__(self):
return self._info['length']
@@ -334,13 +330,15 @@
def tpc_abort(self, transaction):
if transaction is not self._transaction:
return
- self._server.tpc_abort(self._serial)
- self._tbuf.clear()
- self._seriald.clear()
- del self._serials[:]
- self._transaction = None
- self.tpc_cond.notify()
- self.tpc_cond.release()
+ try:
+ self._server.tpc_abort(self._serial)
+ self._tbuf.clear()
+ self._seriald.clear()
+ del self._serials[:]
+ finally:
+ self._transaction = None
+ self.tpc_cond.notify()
+ self.tpc_cond.release()
def tpc_begin(self, transaction, tid=None, status=' '):
self.tpc_cond.acquire()
@@ -391,25 +389,31 @@
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
- if f is not None:
- f()
-
- self._server.tpc_finish(self._serial)
+ try:
+ if f is not None:
+ f()
- r = self._check_serials()
- assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
+ self._server.tpc_finish(self._serial)
- self._update_cache()
+ r = self._check_serials()
+ assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
- self._transaction = None
- self.tpc_cond.notify()
- self.tpc_cond.release()
+ self._update_cache()
+ finally:
+ self._transaction = None
+ self.tpc_cond.notify()
+ self.tpc_cond.release()
def _update_cache(self):
# Iterate over the objects in the transaction buffer and
# update or invalidate the cache.
self._cache.checkSize(self._tbuf.get_size())
- self._tbuf.begin_iterate()
+ try:
+ self._tbuf.begin_iterate()
+ except ValueError, msg:
+ raise ClientStorageError, (
+ "Unexpected error reading temporary file in "
+ "client storage: %s" % msg)
while 1:
try:
t = self._tbuf.next()