Skip to content

State operator step

A pipeline step that executes an operator or callable and updates the pipeline state.

Author

Dimitrij Ray (dimitrij.ray@gdplabs.id)

References

NONE

StateOperatorStep(name, input_states, output_state, operation, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that performs an operation on the pipeline state and updates it.

This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

input_states list[str]

Keys of the state data required by the operation.

output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

operation Callable[[dict[str, Any]], Any]

The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration.

runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation.

Initializes a new StateOperatorStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
input_states list[str]

Keys of the state data required by the operation.

required
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

required
operation Callable[[dict[str, Any]], Any]

The operation to execute. It should accept a dictionary of input data and return the operation result.

required
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None.

None

execute(state, config) async

Executes the operation and processes its output.

This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during operation execution.