Parallel step
A pipeline step that executes multiple branches in parallel.
References
NONE
ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Bases: BranchingStep, HasInputsMixin
A pipeline step that executes multiple branches in parallel.
This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.
The step supports two execution modes controlled by the squash parameter:
1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for:
a. Better raw performance
b. Simpler implementation
c. Less overhead
d. Less transparent for debugging and tracing
2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for:
a. More native LangGraph integration
b. More transparent for debugging and tracing
For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, PipelineSteps]
|
The branches to execute in parallel. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
squash |
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initialize a new ParallelStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
branches |
list | dict[str, PipelineSteps]
|
The branches to execute in parallel. Can be either: List format: Each branch can be: 1. A single step 2. A list of steps to execute sequentially Example: [step1, [step2, step3], step4] Dict format: Keys are branch names, values can be: 1. A single step 2. A list of steps to execute sequentially Example: {"analysis": step1, "validation": [step2, step3], "cleanup": step4} Enables more intuitive step exclusion using branch names. |
required |
input_states |
list[str] | None
|
Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None. |
None
|
squash |
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True. |
True
|
runtime_config_map |
dict[str, str] | None
|
Mapping of input keys to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used. |
None
|
input_map |
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache_store |
'BaseCache' | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
add_to_graph(graph, previous_endpoints, retry_policy=None)
Handle both squashed and expanded modes.
For squashed mode: add the parallel step as a single node. For expanded mode: add the parallel step as a single node and add children to graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph |
StateGraph
|
The graph to add this step to. |
required |
previous_endpoints |
list[str]
|
Endpoints from previous steps to connect to. |
required |
retry_policy |
RetryPolicy | None
|
Retry policy to propagate to child steps. Defaults to None, in which case the retry policy of the step is used. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: Exit points after adding all child steps. |
execute(state, runtime)
async
Execute all branches in parallel and merge their results.
This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced. |
Raises:
| Type | Description |
|---|---|
CancelledError
|
If execution is cancelled, preserved with added context. |
BaseInvokerError
|
If an error occurs during LM invocation. |
RuntimeError
|
For all other exceptions during execution, wrapped with context information. |
TimeoutError
|
If execution times out, preserved with added context. |
ValidationError
|
If input validation fails. |