ParallelTestSuite
Run a series of tests in parallel in several processes.
Attributes
| Attribute | Type | Description |
|---|---|---|
| init_worker | callable = _init_worker | The function used to initialize each worker process in the multiprocessing pool. |
| process_setup | callable = _process_setup_stub | A hook function executed within each worker process to perform environment-specific setup. |
| process_setup_args | tuple = () | A tuple of arguments passed to the process_setup function during worker initialization. |
| run_subsuite | callable = _run_subsuite | The function responsible for executing an individual test subsuite within a worker process. |
| runner_class | class = RemoteTestRunner | The test runner class instantiated within worker processes to execute subsuites and collect results. |
Constructor
Signature
def ParallelTestSuite(
subsuites: list,
processes: int,
failfast: bool = False,
debug_mode: bool = False,
buffer: bool = False
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| subsuites | list | A collection of test suites to be executed in parallel. |
| processes | int | The number of worker processes to use for parallel execution. |
| failfast | bool = False | Whether to stop the test run on the first error or failure. |
| debug_mode | bool = False | Flag to enable debug mode during test execution. |
| buffer | bool = False | Whether to buffer stdout and stderr during test execution. |
Methods
run()
@classmethod
def run(
result: unittest.TestResult
) - > unittest.TestResult
Distribute TestCases across workers. Return an identifier of each TestCase with its result in order to use imap_unordered to show results as soon as they're available.
Parameters
| Name | Type | Description |
|---|---|---|
| result | unittest.TestResult | The test result object to which the outcomes of the executed tests will be reported. |
Returns
| Type | Description |
|---|---|
unittest.TestResult | The updated test result object containing the aggregated outcomes from all parallel workers. |
handle_event()
@classmethod
def handle_event(
result: unittest.TestResult,
tests: list,
event: tuple
)
Dispatches test execution events received from worker processes to the main result collector. It maps worker event data back to specific test instances or error holders.
Parameters
| Name | Type | Description |
|---|---|---|
| result | unittest.TestResult | The result collector that will process the specific event (e.g., addSuccess, addError). |
| tests | list | The list of test cases belonging to the subsuite that generated the event. |
| event | tuple | A tuple containing the event name, the test index, and any associated arguments or tracebacks. |
initialize_suite()
@classmethod
def initialize_suite()
Prepares the suite for parallel execution by capturing database settings and serialized contents. This ensures that worker processes spawned via 'spawn' or 'forkserver' have access to the necessary environment state.