[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - TestThread.py:1.2 CommitLockTests.py:1.5
Jeremy Hylton
jeremy@zope.com
Fri, 9 Aug 2002 16:31:45 -0400
Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv13142
Modified Files:
CommitLockTests.py
Added Files:
TestThread.py
Log Message:
Add TestThread class that helps manage threads started by a unittest.
Use TestThread as base class for WorkerThread in CommitLockTests.
=== StandaloneZODB/ZEO/tests/TestThread.py 1.1 => 1.2 ===
--- /dev/null Fri Aug 9 16:31:45 2002
+++ StandaloneZODB/ZEO/tests/TestThread.py Fri Aug 9 16:31:45 2002
@@ -0,0 +1,43 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A Thread base class for use with unittest."""
+
+from cStringIO import StringIO
+import threading
+import traceback
+
+class TestThread(threading.Thread):
+ __super_init = threading.Thread.__init__
+ __super_run = threading.Thread.run
+
+ def __init__(self, testcase, group=None, target=None, name=None,
+ args=(), kwargs={}, verbose=None):
+ self.__super_init(group, target, name, args, kwargs, verbose)
+ self.setDaemon(1)
+ self._testcase = testcase
+
+ def run(self):
+ try:
+ self.testrun()
+ except Exception, err:
+ s = StringIO()
+ traceback.print_exc(file=s)
+ self._testcase.fail("Exception in thread %s:\n%s\n" %
+ (self, s.getvalue()))
+
+ def cleanup(self, timeout=15):
+ self.join(timeout)
+ if self.isAlive():
+ self._testcase.fail("Thread did not finish: %s" % self)
+
=== StandaloneZODB/ZEO/tests/CommitLockTests.py 1.4 => 1.5 ===
--- StandaloneZODB/ZEO/tests/CommitLockTests.py:1.4 Mon Aug 5 18:32:54 2002
+++ StandaloneZODB/ZEO/tests/CommitLockTests.py Fri Aug 9 16:31:45 2002
@@ -13,13 +13,12 @@
##############################################################################
"""Tests of the distributed commit lock."""
-import threading
-
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
+from ZEO.tests.TestThread import TestThread
ZERO = '\0'*8
@@ -27,19 +26,18 @@
def invalidate(self, *args):
pass
-class WorkerThread(threading.Thread):
+class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
- def __init__(self, storage, trans, method="tpc_finish"):
+ def __init__(self, testcase, storage, trans, method="tpc_finish"):
self.storage = storage
self.trans = trans
self.method = method
- threading.Thread.__init__(self)
- self.setDaemon(1)
+ TestThread.__init__(self, testcase)
- def run(self):
+ def testrun(self):
try:
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
@@ -151,15 +149,13 @@
def _dosetup2(self, storage, trans, tid):
self._threads = []
- t = WorkerThread(storage, trans)
+ t = WorkerThread(self, storage, trans)
self._threads.append(t)
t.start()
def _dowork2(self, method_name):
for t in self._threads:
- t.join(10)
- for t in self._threads:
- self.failIf(t.isAlive())
+ t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."