Skip to content

Subgraph step

A pipeline step that executes another pipeline as a subgraph.

Author

Dimitrij Ray (dimitrij.ray@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

[1] https://langchain-ai.github.io/langgraph/

SubgraphStep(name, subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BaseCompositeStep, HasInputsMixin

A pipeline step that executes another pipeline as a subgraph.

This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

subgraph Pipeline

The pipeline to be executed as a subgraph.

input_map dict[str, str | Val] | None

Unified input map.

output_state_map dict[str, str]

Mapping of parent pipeline state keys to subgraph output keys.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new SubgraphStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_state_map dict[str, str]

Mapping of subgraph input keys to parent pipeline state keys. Defaults to None.

None
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using the SDK's RetryConfig. If None, no retry policy is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to be used for this step. If None, no error handler is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

apply_exclusions(exclusions)

Apply exclusions to this subgraph and its children.

Subgraph has no internal structural changes. It marks itself and uniformly propagates child exclusions to all subgraph steps.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime) async

Executes the subgraph and processes its output.

This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None.

Raises:

Type Description
RuntimeError

If an error occurs during subgraph execution, with details about which step caused the error.