State operator step
A pipeline step that executes an operator or callable and updates the pipeline state.
References
NONE
StateOperatorStep(name, input_states=None, output_state=None, operation=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Bases: BasePipelineStep, HasInputsMixin
A pipeline step that performs an operation on the pipeline state and updates it.
This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initializes a new StateOperatorStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
input_states |
list[str]
|
Keys of the state data required by the operation. |
None
|
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
None
|
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute. It should accept a dictionary of input data and return the operation result. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of operation input arguments to runtime configuration keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the operation. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache_store |
'BaseCache' | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
execute(state, runtime)
async
Executes the operation and processes its output.
This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during operation execution. |