Skip to content

Parallel step

A pipeline step that executes multiple branches in parallel.

Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BranchingStep, HasInputsMixin

A pipeline step that executes multiple branches in parallel.

This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, PipelineSteps]

The branches to execute in parallel.

input_map dict[str, str | Val] | None

Unified input map.

squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initialize a new ParallelStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches list | dict[str, PipelineSteps]

The branches to execute in parallel. Can be either: List format: Each branch can be: 1. A single step 2. A list of steps to execute sequentially Example: [step1, [step2, step3], step4] Dict format: Keys are branch names, values can be: 1. A single step 2. A list of steps to execute sequentially Example: {"analysis": step1, "validation": [step2, step3], "cleanup": step4} Enables more intuitive step exclusion using branch names.

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

add_to_graph(graph, previous_endpoints, retry_policy=None)

Handle both squashed and expanded modes.

For squashed mode: add the parallel step as a single node. For expanded mode: add the parallel step as a single node and add children to graph.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

Endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Retry policy to propagate to child steps. Defaults to None, in which case the retry policy of the step is used.

None

Returns:

Type Description
list[str]

list[str]: Exit points after adding all child steps.

execute(state, runtime) async

Execute all branches in parallel and merge their results.

This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced.

Raises:

Type Description
CancelledError

If execution is cancelled, preserved with added context.

BaseInvokerError

If an error occurs during LM invocation.

RuntimeError

For all other exceptions during execution, wrapped with context information.

TimeoutError

If execution times out, preserved with added context.

ValidationError

If input validation fails.