[Zope3-checkins]
SVN: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py
More refactoring: move all the output related to running the
tests into
Marius Gedminas
marius at pov.lt
Fri Jul 13 06:56:29 EDT 2007
Log message for revision 77841:
More refactoring: move all the output related to running the tests into
OutputFormatter.
Noticed a couple of potential bugs in other parts of the code and added
XXX/TODO comments.
Changed:
U zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py
-=-
Modified: zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py
===================================================================
--- zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py 2007-07-13 10:52:26 UTC (rev 77840)
+++ zope.testing/branches/output-refactoring/src/zope/testing/testrunner.py 2007-07-13 10:56:28 UTC (rev 77841)
@@ -256,6 +256,54 @@
# resume_layer() reasigns sys.stderr for this reason, but be careful
# and don't store the original one in __init__ or something.
+ max_width = 80
+
+ def __init__(self, options):
+ self.options = options
+ self.last_width = 0
+ self.compute_max_width()
+
+ progress = property(lambda self: self.options.progress)
+ verbose = property(lambda self: self.options.verbose)
+
+ def compute_max_width(self):
+ """Try to determine the terminal width."""
+ if self.progress: # XXX: mgedmin: why not do this always?
+ try:
+ # Note that doing this every time is more test friendly.
+ import curses
+ except ImportError:
+ # avoid reimporting a broken module in python 2.3
+ sys.modules['curses'] = None
+ else:
+ try:
+ curses.setupterm()
+ except TypeError:
+ pass
+ else:
+ self.max_width = curses.tigetnum('cols')
+
+ def getShortDescription(self, test, room):
+ """Return a description of a test that fits in ``room`` characters."""
+ room -= 1
+ s = str(test)
+ if len(s) > room:
+ pos = s.find(" (")
+ if pos >= 0:
+ w = room - (pos + 5)
+ if w < 1:
+ # first portion (test method name) is too long
+ s = s[:room-3] + "..."
+ else:
+ pre = s[:pos+2]
+ post = s[-w:]
+ s = "%s...%s" % (pre, post)
+ else:
+ w = room - 4
+ s = '... ' + s[-w:]
+
+ return ' ' + s[:room]
+
def info(self, message):
"""Print an informative message."""
print message
@@ -264,6 +312,14 @@
"""Report an error."""
print message
+ def error_with_banner(self, message):
+ """Report an error with a big ASCII banner."""
+ print
+ print '*'*70
+ print message
+ print '*'*70
+ print
+
def profiler_stats(self, stats):
"""Report profiler stats."""
stats.print_stats(50)
@@ -328,6 +384,7 @@
The next output operation should be stop_set_up().
"""
print " Set up %s" % layer_name,
+ # TODO: flush sys.stdout
def stop_set_up(self, seconds):
"""Report that we've set up a layer.
@@ -343,6 +400,7 @@
tear_down_not_supported().
"""
print " Tear down %s" % layer_name,
+ # TODO: flush sys.stdout
def stop_tear_down(self, seconds):
"""Report that we've tore down a layer.
@@ -358,7 +416,96 @@
"""
print "... not supported"
+ def start_test(self, test, tests_run, total_tests):
+ """Report that we're about to run a test.
+ The next output operation should be test_success(), test_error(), or
+ test_failure().
+ """
+ self.test_width = 0
+ if self.progress:
+ if self.last_width:
+ sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+
+ s = " %d/%d (%.1f%%)" % (tests_run, total_tests,
+ tests_run * 100.0 / total_tests)
+ sys.stdout.write(s)
+ self.test_width += len(s)
+ if self.verbose == 1:
+ room = self.max_width - self.test_width - 1
+ s = self.getShortDescription(test, room)
+ sys.stdout.write(s)
+ self.test_width += len(s)
+
+ elif self.verbose == 1:
+ sys.stdout.write('.' * test.countTestCases())
+
+ if self.verbose > 1:
+ s = str(test)
+ sys.stdout.write(' ')
+ sys.stdout.write(s)
+ self.test_width += len(s) + 1
+
+ sys.stdout.flush()
+
+ def test_success(self, test, seconds):
+ """Report that a test was successful.
+
+ Should be called right after start_test().
+
+ The next output operation should be stop_test().
+ """
+ if self.verbose > 2:
+ s = " (%.3f s)" % seconds
+ sys.stdout.write(s)
+ self.test_width += len(s) + 1
+
+ def test_error(self, test, seconds, exc_info):
+ """Report that an error occurred while running a test.
+
+ Should be called right after start_test().
+
+ The next output operation should be stop_test().
+ """
+ if self.verbose > 2:
+ print " (%.3f s)" % seconds
+ print
+ self._print_traceback("Error in test %s" % test, exc_info)
+ self.test_width = self.last_width = 0
+
+ def test_failure(self, test, seconds, exc_info):
+ """Report that a test failed.
+
+ Should be called right after start_test().
+
+ The next output operation should be stop_test().
+ """
+ if self.verbose > 2:
+ print " (%.3f s)" % seconds
+ print
+ self._print_traceback("Failure in test %s" % test, exc_info)
+ self.test_width = self.last_width = 0
+
+ def _print_traceback(self, msg, exc_info):
+ # TODO: Inline print_traceback here
+ print_traceback(msg, exc_info)
+
+ def stop_test(self, test):
+ """Clean up the output state after a test."""
+ if self.progress:
+ self.last_width = self.test_width
+ elif self.verbose > 1:
+ print
+ sys.stdout.flush()
+
+ def stop_tests(self):
+ """Clean up the output state after a collection of tests."""
+ if self.progress and self.last_width:
+ sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+ if self.verbose == 1 or self.progress:
+ print
+
+
def run(defaults=None, args=None):
if args is None:
args = sys.argv
@@ -703,10 +850,7 @@
test.__dict__.update(state)
t = time.time() - t
- if options.verbose == 1 or options.progress:
- result.stopTests()
- # XXX: figure out how to move this print into the OutputFormatter
- print
+ output.stop_tests()
failures.extend(result.failures)
errors.extend(result.errors)
output.summary(result.testsRun, len(result.failures), len(result.errors), t)
@@ -867,8 +1011,6 @@
class TestResult(unittest.TestResult):
- max_width = 80
-
def __init__(self, options, tests, layer_name=None):
unittest.TestResult.__init__(self)
self.options = options
@@ -880,48 +1022,11 @@
self.layers = order_by_bases(layers)
else:
self.layers = []
- if options.progress:
- count = 0
- for test in tests:
- count += test.countTestCases()
- self.count = count
- self.last_width = 0
+ count = 0
+ for test in tests:
+ count += test.countTestCases()
+ self.count = count
- if options.progress:
- try:
- # Note that doing this every time is more test friendly.
- import curses
- except ImportError:
- # avoid reimporting a broken module in python 2.3
- sys.modules['curses'] = None
- else:
- try:
- curses.setupterm()
- except TypeError:
- pass
- else:
- self.max_width = curses.tigetnum('cols')
-
- def getShortDescription(self, test, room):
- room -= 1
- s = str(test)
- if len(s) > room:
- pos = s.find(" (")
- if pos >= 0:
- w = room - (pos + 5)
- if w < 1:
- # first portion (test method name) is too long
- s = s[:room-3] + "..."
- else:
- pre = s[:pos+2]
- post = s[-w:]
- s = "%s...%s" % (pre, post)
- else:
- w = room - 4
- s = '... ' + s[-w:]
-
- return ' ' + s[:room]
-
def testSetUp(self):
"""A layer may define a setup method to be called before each
individual test.
@@ -945,103 +1050,47 @@
def startTest(self, test):
self.testSetUp()
unittest.TestResult.startTest(self, test)
- testsRun = self.testsRun - 1
+ testsRun = self.testsRun - 1 # subtract the one the base class added
count = test.countTestCases()
self.testsRun = testsRun + count
- options = self.options
- self.test_width = 0
- # TODO: figure out how to move this to OutputFormatter
- if options.progress:
- if self.last_width:
- sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
+ self.options.output.start_test(test, self.testsRun, self.count)
- s = " %d/%d (%.1f%%)" % (
- self.testsRun, self.count,
- (self.testsRun) * 100.0 / self.count
- )
- sys.stdout.write(s)
- self.test_width += len(s)
- if options.verbose == 1:
- room = self.max_width - self.test_width - 1
- s = self.getShortDescription(test, room)
- sys.stdout.write(s)
- self.test_width += len(s)
-
- elif options.verbose == 1:
- for i in range(count):
- sys.stdout.write('.')
- testsRun += 1
-
- if options.verbose > 1:
- s = str(test)
- sys.stdout.write(' ')
- sys.stdout.write(s)
- self.test_width += len(s) + 1
-
- sys.stdout.flush()
-
self._threads = threading.enumerate()
self._start_time = time.time()
def addSuccess(self, test):
- # TODO: figure out how to move this to OutputFormatter
- if self.options.verbose > 2:
- t = max(time.time() - self._start_time, 0.0)
- s = " (%.3f s)" % t
- sys.stdout.write(s)
- self.test_width += len(s) + 1
+ t = max(time.time() - self._start_time, 0.0)
+ self.options.output.test_success(test, t)
def addError(self, test, exc_info):
- # TODO: figure out how to move this to OutputFormatter
- if self.options.verbose > 2:
- print " (%.3f s)" % (time.time() - self._start_time)
+ self.options.output.test_error(test, time.time() - self._start_time,
+ exc_info)
unittest.TestResult.addError(self, test, exc_info)
- print
- self._print_traceback("Error in test %s" % test, exc_info)
if self.options.post_mortem:
if self.options.resume_layer:
- print
- print '*'*70
- print ("Can't post-mortem debug when running a layer"
- " as a subprocess!")
- print '*'*70
- print
+ self.options.output.error_with_banner("Can't post-mortem debug"
+ " when running a layer"
+ " as a subprocess!")
else:
post_mortem(exc_info)
- self.test_width = self.last_width = 0
-
def addFailure(self, test, exc_info):
- # TODO: figure out how to move this to OutputFormatter
+ self.options.output.test_failure(test, time.time() - self._start_time,
+ exc_info)
-
- if self.options.verbose > 2:
- print " (%.3f s)" % (time.time() - self._start_time)
-
unittest.TestResult.addFailure(self, test, exc_info)
- print
- self._print_traceback("Failure in test %s" % test, exc_info)
if self.options.post_mortem:
+ # XXX: mgedmin: why isn't there a resume_layer check here like
+ # in addError?
post_mortem(exc_info)
- self.test_width = self.last_width = 0
-
-
- def stopTests(self):
- # TODO: figure out how to move this to OutputFormatter
- if self.options.progress and self.last_width:
- sys.stdout.write('\r' + (' ' * self.last_width) + '\r')
-
def stopTest(self, test):
self.testTearDown()
- if self.options.progress:
- self.last_width = self.test_width
- elif self.options.verbose > 1:
- print
+ self.options.output.stop_test(test)
# TODO: figure out how to move this to OutputFormatter
if gc.garbage:
@@ -1061,13 +1110,7 @@
print test
print "New thread(s):", new_threads
- sys.stdout.flush()
-
- def _print_traceback(self, msg, exc_info):
- # TODO: figure out how to move this to OutputFormatter
- print_traceback(msg, exc_info)
-
doctest_template = """
File "%s", line %s, in %s
@@ -1092,7 +1135,6 @@
def print_traceback(msg, exc_info):
- # TODO: figure out how to move this to OutputFormatter
print
print msg
@@ -1982,7 +2024,7 @@
merge_options(options, defaults)
options.original_testrunner_args = original_testrunner_args
- options.output = OutputFormatter()
+ options.output = OutputFormatter(options)
options.fail = False
More information about the Zope3-Checkins
mailing list