Skip to content

State operator step

A pipeline step that executes an operator or callable and updates the pipeline state.

Author

Dimitrij Ray (dimitrij.ray@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

StateOperatorStep(name, input_states=None, output_state=None, operation=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that performs an operation on the pipeline state and updates it.

This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

operation Callable[[dict[str, Any]], Any]

The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new StateOperatorStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
input_states list[str]

Keys of the state data required by the operation.

None
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

None
operation Callable[[dict[str, Any]], Any]

The operation to execute. It should accept a dictionary of input data and return the operation result.

None
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the operation and processes its output.

This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during operation execution.