Skip to content

Parallel step

A pipeline step that executes multiple branches in parallel.

Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes multiple branches in parallel.

This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches list[BasePipelineStep | list[BasePipelineStep]]]

The branches to execute in parallel.

squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes.

input_states list[str] | None

Keys from the state that should be passed to branches. If None, all state keys will be passed.

Initialize a new ParallelStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches list[BasePipelineStep | list[BasePipelineStep]]

The branches to execute in parallel. Each branch can be: 1. A single step 2. A list of steps to execute sequentially

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used.

None

add_to_graph(graph, previous_endpoints)

Add this step to the graph.

Uses either squashed or expanded approach based on the squash parameter.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: The endpoint(s) of this step.

execute(state, config) async

Execute all branches in parallel and merge their results.

This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced.

Raises:

Type Description
CancelledError

If execution is cancelled, preserved with added context.

BaseInvokerError

If an error occurs during LM invocation.

RuntimeError

For all other exceptions during execution, wrapped with context information.

TimeoutError

If execution times out, preserved with added context.

ValidationError

If input validation fails.

get_mermaid_diagram()

Combines the base diagram and nested step diagrams into a complete visualization.

If the parallel step is squashed, a custom logic is used to represent the structure. If the parallel step is not squashed, the structure is created natively using LangGraph nodes.

Returns:

Name Type Description
str str

Complete Mermaid diagram for this parallel step.