[Zodb-checkins] CVS: ZEO/ZEO - ClientStorage.py:1.35.6.4.2.6

Barry Warsaw barry@wooz.org
Wed, 15 May 2002 19:47:49 -0400


Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv11828

Modified Files:
      Tag: ZEO2-branch
	ClientStorage.py 
Log Message:
Updates and additions to the threading tests.  We specifically want to
make sure:

- that if a storage is closed in one thread, while another thread is
  in the middle of a transaction, the second thread will get an
  exception when it tries to tpc_finish();

- that if one thread begins a transaction, and a second thread starts
  a transaction, the second will block in the tpc_begin() for a while,
  getting an exception when a third thread closes the storage;

- that in the situation above, the second thread, which never
  successfully got through its tpc_begin() does not get the
  transaction lock (I think we're only moderately successful in
  testing this situation);

These tests and the accompanying fixes to ClientStorage should knock
off the annoying "AssertionError: notify() of un-acquire()d lock"
errors in the ZEO2 branch.

Changes to ClientStorage include:

- notifyDisconnected(): Don't do the notifyAll(), release() or setting
  of self._transaction to None.  This is now the responsibility of
  tpc_finish() and tpc_abort().

- tpc_abort(), tpc_finish(): Wrap the core logic in a try/finally
  which does the proper cleanup that notifyDisconnected() used to do.

- _update_cache(): Catch the ValueError that can happen in the
  begin_iterate() call if the backing storage has already been
  closed.  Transform this into a ClientStorageError, just like the
  try/except in the body of the "while 1:" was doing.


=== ZEO/ZEO/ClientStorage.py 1.35.6.4.2.5 => 1.35.6.4.2.6 ===
         log2(PROBLEM, "Disconnected from storage")
         self._server = disconnected_stub
-        if self._transaction:
-            self._transaction = None
-            self.tpc_cond.notifyAll()
-            self.tpc_cond.release()
 
     def __len__(self):
         return self._info['length']
@@ -334,13 +330,15 @@
     def tpc_abort(self, transaction):
         if transaction is not self._transaction:
             return
-        self._server.tpc_abort(self._serial)
-        self._tbuf.clear()
-        self._seriald.clear()
-        del self._serials[:]
-        self._transaction = None
-        self.tpc_cond.notify()
-        self.tpc_cond.release()
+        try:
+            self._server.tpc_abort(self._serial)
+            self._tbuf.clear()
+            self._seriald.clear()
+            del self._serials[:]
+        finally:
+            self._transaction = None
+            self.tpc_cond.notify()
+            self.tpc_cond.release()
 
     def tpc_begin(self, transaction, tid=None, status=' '):
         self.tpc_cond.acquire()
@@ -391,25 +389,31 @@
     def tpc_finish(self, transaction, f=None):
         if transaction is not self._transaction:
             return
-        if f is not None:
-            f()
-
-        self._server.tpc_finish(self._serial)
+        try:
+            if f is not None:
+                f()
 
-        r = self._check_serials()
-        assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
+            self._server.tpc_finish(self._serial)
 
-        self._update_cache()
+            r = self._check_serials()
+            assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
 
-        self._transaction = None
-        self.tpc_cond.notify()
-        self.tpc_cond.release()
+            self._update_cache()
+        finally:
+            self._transaction = None
+            self.tpc_cond.notify()
+            self.tpc_cond.release()
 
     def _update_cache(self):
         # Iterate over the objects in the transaction buffer and
         # update or invalidate the cache.
         self._cache.checkSize(self._tbuf.get_size())
-        self._tbuf.begin_iterate()
+        try:
+            self._tbuf.begin_iterate()
+        except ValueError, msg:
+            raise ClientStorageError, (
+                "Unexpected error reading temporary file in "
+                "client storage: %s" % msg)
         while 1:
             try:
                 t = self._tbuf.next()