Skip to content

Interrupt step

Pipeline step for pausing execution.

This module provides the InterruptStep class, which leverages LangGraph's native interrupt function to pause the pipeline's execution at a specific node.

InterruptStep(name, message='Pipeline interrupted.', resume_value_map=None, **kwargs)

Bases: BasePipelineStep

A pipeline step that pauses execution using LangGraph's interrupt feature.

When the pipeline reaches this step, it will throw an interrupt which pauses execution and returns control to the caller (along with the current state). To resume, the user must invoke the pipeline again with a Command(resume=...) payload and the same thread_id.

The value returned from the interrupt() call (which is whatever the user provides in Command(resume=val)) will be stored in the state key specified by resume_value_map if provided.

Initializes an InterruptStep.

Parameters:

Name Type Description Default
name str

The unique identifier for this step.

required
message str | dict[str, Any]

The payload to emit when the interrupt occurs. Defaults to "Pipeline interrupted.".

'Pipeline interrupted.'
resume_value_map str | list[str] | None

State key(s) where the resume value will be stored. Defaults to None.

None
**kwargs Any

Additional configuration (e.g., error_handler) for the BasePipelineStep.

{}

Raises:

Type Description
ValueError

If cache is provided in kwargs, as this step cannot be cached.

execute(state, runtime, config=None) async

Execute the interrupt and handle the resumed value.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime

The pipeline runtime.

required
config RunnableConfig | None

The RunnableConfig.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: State updates containing the resumed value if a resume_value_map is configured.

Raises:

Type Description
TypeError

If resume_value_map is a list but the resumed value is not a dictionary.