[Zodb-checkins] CVS: ZEO/ZEO/tests - ThreadTests.py:1.1.2.2

Barry Warsaw barry@wooz.org
Wed, 15 May 2002 19:44:32 -0400


Update of /cvs-repository/ZEO/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv10372

Modified Files:
      Tag: ZEO2-branch
	ThreadTests.py 
Log Message:
Updates and additions to the threading tests.  We specifically want to
make sure:

- that if a storage is closed in one thread, while another thread is
  in the middle of a transaction, the second thread will get an
  exception when it tries to tpc_finish();

- that if one thread begins a transaction, and a second thread starts
  a transaction, the second will block in the tpc_begin() for a while,
  getting an exception when a third thread closes the storage;

- that in the situation above, the second thread, which never
  successfully got through its tpc_begin() does not get the
  transaction lock (I think we're only moderately successful in
  testing this situation);

These tests and the accompanying fixes to ClientStorage should knock
off the annoying "AssertionError: notify() of un-acquire()d lock"
errors in the ZEO2 thread.


=== ZEO/ZEO/tests/ThreadTests.py 1.1.2.1 => 1.1.2.2 ===
 ZERO = '\0'*8
 
-class DummyDB:
-    def invalidate(self, *args):
-        pass
-
-
-class GetsThroughVoteThread(threading.Thread):
-    # run the entire test in a thread so that the blocking call for
-    # tpc_vote() doesn't hang the test suite.
-
-    def __init__(self, storage, doCloseEvent, threadStartedEvent):
+class BasicThread(threading.Thread):
+    def __init__(self, storage, doNextEvent, threadStartedEvent):
         self.storage = storage
         self.trans = Transaction()
-        self.doCloseEvent = doCloseEvent
+        self.doNextEvent = doNextEvent
         self.threadStartedEvent = threadStartedEvent
+        self.gotValueError = 0
         self.gotDisconnected = 0
         threading.Thread.__init__(self)
 
+
+class GetsThroughVoteThread(BasicThread):
+    # This thread gets partially through a transaction before it turns
+    # execution over to another thread.  We're trying to establish that a
+    # tpc_finish() after a storage has been closed by another thread will get
+    # a ClientStorageError error.
+    #
+    # This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
+    # to do the tpc_finish() when the other thread closes the storage.
     def run(self):
         self.storage.tpc_begin(self.trans)
         oid = self.storage.new_oid()
         self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
         self.storage.tpc_vote(self.trans)
         self.threadStartedEvent.set()
-        self.doCloseEvent.wait(10)
+        self.doNextEvent.wait(10)
         try:
             self.storage.tpc_finish(self.trans)
+        except ZEO.ClientStorage.ClientStorageError:
+            self.gotValueError = 1
+            self.storage.tpc_abort(self.trans)
+
+
+class GetsThroughBeginThread(BasicThread):
+    # This class is like the above except that it is intended to be run when
+    # another thread is already in a tpc_begin().  Thus, this thread will
+    # block in the tpc_begin until another thread closes the storage.  When
+    # that happens, this one will get disconnected too.
+    def run(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+        except ZEO.ClientStorage.ClientStorageError:
+            self.gotValueError = 1
+
+
+class AbortsAfterBeginFailsThread(BasicThread):
+    # This class is identical to GetsThroughBeginThread except that it
+    # attempts to tpc_abort() after the tpc_begin() fails.  That will raise a
+    # ClientDisconnected exception which implies that we don't have the lock,
+    # and that's what we really want to test (but it's difficult given the
+    # threading module's API).
+    def run(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+        except ZEO.ClientStorage.ClientStorageError:
+            self.gotValueError = 1
+        try:
+            self.storage.tpc_abort(self.trans)
         except Disconnected:
             self.gotDisconnected = 1
 
 
 class ThreadTests:
-
     # Thread 1 should start a transaction, but not get all the way through it.
     # Main thread should close the connection.  Thread 1 should then get
     # disconnected.
-
     def checkDisconnectedOnThread2Close(self):
-        doCloseEvent = threading.Event()
+        doNextEvent = threading.Event()
         threadStartedEvent = threading.Event()
         thread1 = GetsThroughVoteThread(self._storage,
-                                        doCloseEvent, threadStartedEvent)
+                                        doNextEvent, threadStartedEvent)
         thread1.start()
         threadStartedEvent.wait(10)
         self._storage.close()
-        doCloseEvent.set()
+        doNextEvent.set()
+        thread1.join()
+        self.assertEqual(thread1.gotValueError, 1)
+
+    # Thread 1 should start a transaction, but not get all the way through
+    # it.  While thread 1 is in the middle of the transaction, a second thread
+    # should start a transaction, and it will block in the tcp_begin() --
+    # because thread 1 has acquired the lock in its tpc_begin().  Now the main
+    # thread closes the storage and both sub-threads should get disconnected.
+    def checkSecondBeginFails(self):
+        doNextEvent = threading.Event()
+        threadStartedEvent = threading.Event()
+        thread1 = GetsThroughVoteThread(self._storage,
+                                        doNextEvent, threadStartedEvent)
+        thread2 = GetsThroughBeginThread(self._storage,
+                                         doNextEvent, threadStartedEvent)
+        thread1.start()
+        threadStartedEvent.wait(1)
+        thread2.start()
+        self._storage.close()
+        doNextEvent.set()
+        thread1.join()
+        thread2.join()
+        self.assertEqual(thread1.gotValueError, 1)
+        self.assertEqual(thread2.gotValueError, 1)
+
+    def checkThatFailedBeginDoesNotHaveLock(self):
+        doNextEvent = threading.Event()
+        threadStartedEvent = threading.Event()
+        thread1 = GetsThroughVoteThread(self._storage,
+                                        doNextEvent, threadStartedEvent)
+        thread2 = AbortsAfterBeginFailsThread(self._storage,
+                                              doNextEvent, threadStartedEvent)
+        thread1.start()
+        threadStartedEvent.wait(1)
+        thread2.start()
+        self._storage.close()
+        doNextEvent.set()
         thread1.join()
-        self.assertEqual(thread1.gotDisconnected, 1)
+        thread2.join()
+        self.assertEqual(thread1.gotValueError, 1)
+        self.assertEqual(thread2.gotValueError, 1)
+        self.assertEqual(thread2.gotDisconnected, 1)