2018-10-02 03:12:47 +00:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# SPDX-License-Identifier: GPL-2.0+
|
|
|
|
#
|
|
|
|
# Modified by: Corey Goldberg, 2013
|
|
|
|
#
|
|
|
|
# Original code from:
|
|
|
|
# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
|
|
|
|
# Copyright (C) 2005-2011 Canonical Ltd
|
|
|
|
|
|
|
|
"""Python testtools extension for running unittest suites concurrently.
|
|
|
|
|
|
|
|
The `testtools` project provides a ConcurrentTestSuite class, but does
|
|
|
|
not provide a `make_tests` implementation needed to use it.
|
|
|
|
|
|
|
|
This allows you to parallelize a test run across a configurable number
|
|
|
|
of worker processes. While this can speed up CPU-bound test runs, it is
|
|
|
|
mainly useful for IO-bound tests that spend most of their time waiting for
|
|
|
|
data to arrive from someplace else and can benefit from cocncurrency.
|
|
|
|
|
|
|
|
Unix only.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import traceback
|
|
|
|
import unittest
|
|
|
|
from itertools import cycle
|
|
|
|
from multiprocessing import cpu_count
|
|
|
|
|
|
|
|
from subunit import ProtocolTestCase, TestProtocolClient
|
|
|
|
from subunit.test_results import AutoTimingTestResultDecorator
|
|
|
|
|
|
|
|
from testtools import ConcurrentTestSuite, iterate_tests
|
2022-04-02 17:06:08 +00:00
|
|
|
from testtools.content import TracebackContent, text_content
|
2018-10-02 03:12:47 +00:00
|
|
|
|
|
|
|
|
|
|
|
_all__ = [
|
|
|
|
'ConcurrentTestSuite',
|
|
|
|
'fork_for_tests',
|
|
|
|
'partition_tests',
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
CPU_COUNT = cpu_count()
|
|
|
|
|
|
|
|
|
2022-04-02 17:06:08 +00:00
|
|
|
class BufferingTestProtocolClient(TestProtocolClient):
|
|
|
|
"""A TestProtocolClient which can buffer the test outputs
|
|
|
|
|
|
|
|
This class captures the stdout and stderr output streams of the
|
|
|
|
tests as it runs them, and includes the output texts in the subunit
|
|
|
|
stream as additional details.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: A file-like object to write a subunit stream to
|
|
|
|
buffer (bool): True to capture test stdout/stderr outputs and
|
|
|
|
include them in the test details
|
|
|
|
"""
|
|
|
|
def __init__(self, stream, buffer=True):
|
|
|
|
super().__init__(stream)
|
|
|
|
self.buffer = buffer
|
|
|
|
|
|
|
|
def _addOutcome(self, outcome, test, error=None, details=None,
|
|
|
|
error_permitted=True):
|
|
|
|
"""Report a test outcome to the subunit stream
|
|
|
|
|
|
|
|
The parent class uses this function as a common implementation
|
|
|
|
for various methods that report successes, errors, failures, etc.
|
|
|
|
|
|
|
|
This version automatically upgrades the error tracebacks to the
|
|
|
|
new 'details' format by wrapping them in a Content object, so
|
|
|
|
that we can include the captured test output in the test result
|
|
|
|
details.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
outcome: A string describing the outcome - used as the
|
|
|
|
event name in the subunit stream.
|
|
|
|
test: The test case whose outcome is to be reported
|
|
|
|
error: Standard unittest positional argument form - an
|
|
|
|
exc_info tuple.
|
|
|
|
details: New Testing-in-python drafted API; a dict from
|
|
|
|
string to subunit.Content objects.
|
|
|
|
error_permitted: If True then one and only one of error or
|
|
|
|
details must be supplied. If False then error must not
|
|
|
|
be supplied and details is still optional.
|
|
|
|
"""
|
|
|
|
if details is None:
|
|
|
|
details = {}
|
|
|
|
|
|
|
|
# Parent will raise an exception if error_permitted is False but
|
|
|
|
# error is not None. We want that exception in that case, so
|
|
|
|
# don't touch error when error_permitted is explicitly False.
|
|
|
|
if error_permitted and error is not None:
|
|
|
|
# Parent class prefers error over details
|
|
|
|
details['traceback'] = TracebackContent(error, test)
|
|
|
|
error_permitted = False
|
|
|
|
error = None
|
|
|
|
|
|
|
|
if self.buffer:
|
|
|
|
stdout = sys.stdout.getvalue()
|
|
|
|
if stdout:
|
|
|
|
details['stdout'] = text_content(stdout)
|
|
|
|
|
|
|
|
stderr = sys.stderr.getvalue()
|
|
|
|
if stderr:
|
|
|
|
details['stderr'] = text_content(stderr)
|
|
|
|
|
|
|
|
return super()._addOutcome(outcome, test, error=error,
|
|
|
|
details=details, error_permitted=error_permitted)
|
|
|
|
|
|
|
|
|
|
|
|
def fork_for_tests(concurrency_num=CPU_COUNT, buffer=False):
|
2018-10-02 03:12:47 +00:00
|
|
|
"""Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
|
|
|
|
|
|
|
|
:param concurrency_num: number of processes to use.
|
|
|
|
"""
|
2022-04-02 17:06:08 +00:00
|
|
|
if buffer:
|
|
|
|
test_protocol_client_class = BufferingTestProtocolClient
|
|
|
|
else:
|
|
|
|
test_protocol_client_class = TestProtocolClient
|
|
|
|
|
2018-10-02 03:12:47 +00:00
|
|
|
def do_fork(suite):
|
|
|
|
"""Take suite and start up multiple runners by forking (Unix only).
|
|
|
|
|
|
|
|
:param suite: TestSuite object.
|
|
|
|
|
|
|
|
:return: An iterable of TestCase-like objects which can each have
|
|
|
|
run(result) called on them to feed tests to result.
|
|
|
|
"""
|
|
|
|
result = []
|
|
|
|
test_blocks = partition_tests(suite, concurrency_num)
|
|
|
|
# Clear the tests from the original suite so it doesn't keep them alive
|
|
|
|
suite._tests[:] = []
|
|
|
|
for process_tests in test_blocks:
|
|
|
|
process_suite = unittest.TestSuite(process_tests)
|
|
|
|
# Also clear each split list so new suite has only reference
|
|
|
|
process_tests[:] = []
|
|
|
|
c2pread, c2pwrite = os.pipe()
|
|
|
|
pid = os.fork()
|
|
|
|
if pid == 0:
|
|
|
|
try:
|
2020-12-29 03:34:58 +00:00
|
|
|
stream = os.fdopen(c2pwrite, 'wb')
|
2018-10-02 03:12:47 +00:00
|
|
|
os.close(c2pread)
|
|
|
|
# Leave stderr and stdout open so we can see test noise
|
|
|
|
# Close stdin so that the child goes away if it decides to
|
|
|
|
# read from stdin (otherwise its a roulette to see what
|
|
|
|
# child actually gets keystrokes for pdb etc).
|
|
|
|
sys.stdin.close()
|
|
|
|
subunit_result = AutoTimingTestResultDecorator(
|
2022-04-02 17:06:08 +00:00
|
|
|
test_protocol_client_class(stream)
|
2018-10-02 03:12:47 +00:00
|
|
|
)
|
|
|
|
process_suite.run(subunit_result)
|
|
|
|
except:
|
|
|
|
# Try and report traceback on stream, but exit with error
|
|
|
|
# even if stream couldn't be created or something else
|
|
|
|
# goes wrong. The traceback is formatted to a string and
|
|
|
|
# written in one go to avoid interleaving lines from
|
|
|
|
# multiple failing children.
|
|
|
|
try:
|
|
|
|
stream.write(traceback.format_exc())
|
|
|
|
finally:
|
|
|
|
os._exit(1)
|
|
|
|
os._exit(0)
|
|
|
|
else:
|
|
|
|
os.close(c2pwrite)
|
2020-12-29 03:34:58 +00:00
|
|
|
stream = os.fdopen(c2pread, 'rb')
|
2022-04-02 17:06:08 +00:00
|
|
|
# If we don't pass the second argument here, it defaults
|
|
|
|
# to sys.stdout.buffer down the line. But if we don't
|
|
|
|
# pass it *now*, it may be resolved after sys.stdout is
|
|
|
|
# replaced with a StringIO (to capture tests' outputs)
|
|
|
|
# which doesn't have a buffer attribute and can end up
|
|
|
|
# occasionally causing a 'broken-runner' error.
|
|
|
|
test = ProtocolTestCase(stream, sys.stdout.buffer)
|
2018-10-02 03:12:47 +00:00
|
|
|
result.append(test)
|
|
|
|
return result
|
|
|
|
return do_fork
|
|
|
|
|
|
|
|
|
|
|
|
def partition_tests(suite, count):
|
|
|
|
"""Partition suite into count lists of tests."""
|
|
|
|
# This just assigns tests in a round-robin fashion. On one hand this
|
|
|
|
# splits up blocks of related tests that might run faster if they shared
|
|
|
|
# resources, but on the other it avoids assigning blocks of slow tests to
|
|
|
|
# just one partition. So the slowest partition shouldn't be much slower
|
|
|
|
# than the fastest.
|
|
|
|
partitions = [list() for _ in range(count)]
|
|
|
|
tests = iterate_tests(suite)
|
|
|
|
for partition, test in zip(cycle(partitions), tests):
|
|
|
|
partition.append(test)
|
|
|
|
return partitions
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
import time
|
|
|
|
|
|
|
|
class SampleTestCase(unittest.TestCase):
|
|
|
|
"""Dummy tests that sleep for demo."""
|
|
|
|
|
|
|
|
def test_me_1(self):
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
def test_me_2(self):
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
def test_me_3(self):
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
def test_me_4(self):
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
# Load tests from SampleTestCase defined above
|
|
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
|
|
|
|
runner = unittest.TextTestRunner()
|
|
|
|
|
|
|
|
# Run tests sequentially
|
|
|
|
runner.run(suite)
|
|
|
|
|
|
|
|
# Run same tests across 4 processes
|
|
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
|
|
|
|
concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
|
|
|
|
runner.run(concurrent_suite)
|