mirror of
https://github.com/AsahiLinux/u-boot
synced 2024-11-25 14:10:43 +00:00
ebcaafcded
While running tests for a python tool, the tests' outputs get printed in whatever order they happen to run, without any indication as to which output belongs to which test. Unittest supports capturing these outputs and printing them as part of the test summaries, but when a failure or error occurs it switches back to printing as the tests run. Testtools and subunit tests can do the same as their parts inherit from unittest, but they don't outright expose this functionality. On the unittest side, enable output buffering for the custom test result class. Try to avoid ugly outputs by not printing stdout/stderr before the test summary for low verbosity levels and for successful tests. On the subunit side, implement a custom TestProtocolClient that enables the same underlying functionality and injects the captured streams as additional test details. This causes them to be merged into their test's error traceback message, which is later rebuilt into an exception and passed to our unittest report class. Signed-off-by: Alper Nebi Yasak <alpernebiyasak@gmail.com> Reviewed-by: Simon Glass <sjg@chromium.org>
221 lines
8.1 KiB
Python
221 lines
8.1 KiB
Python
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: GPL-2.0+
|
|
#
|
|
# Modified by: Corey Goldberg, 2013
|
|
#
|
|
# Original code from:
|
|
# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
|
|
# Copyright (C) 2005-2011 Canonical Ltd
|
|
|
|
"""Python testtools extension for running unittest suites concurrently.
|
|
|
|
The `testtools` project provides a ConcurrentTestSuite class, but does
|
|
not provide a `make_tests` implementation needed to use it.
|
|
|
|
This allows you to parallelize a test run across a configurable number
|
|
of worker processes. While this can speed up CPU-bound test runs, it is
|
|
mainly useful for IO-bound tests that spend most of their time waiting for
|
|
data to arrive from someplace else and can benefit from cocncurrency.
|
|
|
|
Unix only.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import traceback
|
|
import unittest
|
|
from itertools import cycle
|
|
from multiprocessing import cpu_count
|
|
|
|
from subunit import ProtocolTestCase, TestProtocolClient
|
|
from subunit.test_results import AutoTimingTestResultDecorator
|
|
|
|
from testtools import ConcurrentTestSuite, iterate_tests
|
|
from testtools.content import TracebackContent, text_content
|
|
|
|
|
|
_all__ = [
|
|
'ConcurrentTestSuite',
|
|
'fork_for_tests',
|
|
'partition_tests',
|
|
]
|
|
|
|
|
|
CPU_COUNT = cpu_count()
|
|
|
|
|
|
class BufferingTestProtocolClient(TestProtocolClient):
|
|
"""A TestProtocolClient which can buffer the test outputs
|
|
|
|
This class captures the stdout and stderr output streams of the
|
|
tests as it runs them, and includes the output texts in the subunit
|
|
stream as additional details.
|
|
|
|
Args:
|
|
stream: A file-like object to write a subunit stream to
|
|
buffer (bool): True to capture test stdout/stderr outputs and
|
|
include them in the test details
|
|
"""
|
|
def __init__(self, stream, buffer=True):
|
|
super().__init__(stream)
|
|
self.buffer = buffer
|
|
|
|
def _addOutcome(self, outcome, test, error=None, details=None,
|
|
error_permitted=True):
|
|
"""Report a test outcome to the subunit stream
|
|
|
|
The parent class uses this function as a common implementation
|
|
for various methods that report successes, errors, failures, etc.
|
|
|
|
This version automatically upgrades the error tracebacks to the
|
|
new 'details' format by wrapping them in a Content object, so
|
|
that we can include the captured test output in the test result
|
|
details.
|
|
|
|
Args:
|
|
outcome: A string describing the outcome - used as the
|
|
event name in the subunit stream.
|
|
test: The test case whose outcome is to be reported
|
|
error: Standard unittest positional argument form - an
|
|
exc_info tuple.
|
|
details: New Testing-in-python drafted API; a dict from
|
|
string to subunit.Content objects.
|
|
error_permitted: If True then one and only one of error or
|
|
details must be supplied. If False then error must not
|
|
be supplied and details is still optional.
|
|
"""
|
|
if details is None:
|
|
details = {}
|
|
|
|
# Parent will raise an exception if error_permitted is False but
|
|
# error is not None. We want that exception in that case, so
|
|
# don't touch error when error_permitted is explicitly False.
|
|
if error_permitted and error is not None:
|
|
# Parent class prefers error over details
|
|
details['traceback'] = TracebackContent(error, test)
|
|
error_permitted = False
|
|
error = None
|
|
|
|
if self.buffer:
|
|
stdout = sys.stdout.getvalue()
|
|
if stdout:
|
|
details['stdout'] = text_content(stdout)
|
|
|
|
stderr = sys.stderr.getvalue()
|
|
if stderr:
|
|
details['stderr'] = text_content(stderr)
|
|
|
|
return super()._addOutcome(outcome, test, error=error,
|
|
details=details, error_permitted=error_permitted)
|
|
|
|
|
|
def fork_for_tests(concurrency_num=CPU_COUNT, buffer=False):
|
|
"""Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
|
|
|
|
:param concurrency_num: number of processes to use.
|
|
"""
|
|
if buffer:
|
|
test_protocol_client_class = BufferingTestProtocolClient
|
|
else:
|
|
test_protocol_client_class = TestProtocolClient
|
|
|
|
def do_fork(suite):
|
|
"""Take suite and start up multiple runners by forking (Unix only).
|
|
|
|
:param suite: TestSuite object.
|
|
|
|
:return: An iterable of TestCase-like objects which can each have
|
|
run(result) called on them to feed tests to result.
|
|
"""
|
|
result = []
|
|
test_blocks = partition_tests(suite, concurrency_num)
|
|
# Clear the tests from the original suite so it doesn't keep them alive
|
|
suite._tests[:] = []
|
|
for process_tests in test_blocks:
|
|
process_suite = unittest.TestSuite(process_tests)
|
|
# Also clear each split list so new suite has only reference
|
|
process_tests[:] = []
|
|
c2pread, c2pwrite = os.pipe()
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
try:
|
|
stream = os.fdopen(c2pwrite, 'wb')
|
|
os.close(c2pread)
|
|
# Leave stderr and stdout open so we can see test noise
|
|
# Close stdin so that the child goes away if it decides to
|
|
# read from stdin (otherwise its a roulette to see what
|
|
# child actually gets keystrokes for pdb etc).
|
|
sys.stdin.close()
|
|
subunit_result = AutoTimingTestResultDecorator(
|
|
test_protocol_client_class(stream)
|
|
)
|
|
process_suite.run(subunit_result)
|
|
except:
|
|
# Try and report traceback on stream, but exit with error
|
|
# even if stream couldn't be created or something else
|
|
# goes wrong. The traceback is formatted to a string and
|
|
# written in one go to avoid interleaving lines from
|
|
# multiple failing children.
|
|
try:
|
|
stream.write(traceback.format_exc())
|
|
finally:
|
|
os._exit(1)
|
|
os._exit(0)
|
|
else:
|
|
os.close(c2pwrite)
|
|
stream = os.fdopen(c2pread, 'rb')
|
|
# If we don't pass the second argument here, it defaults
|
|
# to sys.stdout.buffer down the line. But if we don't
|
|
# pass it *now*, it may be resolved after sys.stdout is
|
|
# replaced with a StringIO (to capture tests' outputs)
|
|
# which doesn't have a buffer attribute and can end up
|
|
# occasionally causing a 'broken-runner' error.
|
|
test = ProtocolTestCase(stream, sys.stdout.buffer)
|
|
result.append(test)
|
|
return result
|
|
return do_fork
|
|
|
|
|
|
def partition_tests(suite, count):
|
|
"""Partition suite into count lists of tests."""
|
|
# This just assigns tests in a round-robin fashion. On one hand this
|
|
# splits up blocks of related tests that might run faster if they shared
|
|
# resources, but on the other it avoids assigning blocks of slow tests to
|
|
# just one partition. So the slowest partition shouldn't be much slower
|
|
# than the fastest.
|
|
partitions = [list() for _ in range(count)]
|
|
tests = iterate_tests(suite)
|
|
for partition, test in zip(cycle(partitions), tests):
|
|
partition.append(test)
|
|
return partitions
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import time
|
|
|
|
class SampleTestCase(unittest.TestCase):
|
|
"""Dummy tests that sleep for demo."""
|
|
|
|
def test_me_1(self):
|
|
time.sleep(0.5)
|
|
|
|
def test_me_2(self):
|
|
time.sleep(0.5)
|
|
|
|
def test_me_3(self):
|
|
time.sleep(0.5)
|
|
|
|
def test_me_4(self):
|
|
time.sleep(0.5)
|
|
|
|
# Load tests from SampleTestCase defined above
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
|
|
runner = unittest.TextTestRunner()
|
|
|
|
# Run tests sequentially
|
|
runner.run(suite)
|
|
|
|
# Run same tests across 4 processes
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
|
|
concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
|
|
runner.run(concurrent_suite)
|