Skip to content

Steps

Contains the base classes and implementations of pipeline steps.

ComponentStep(name, component, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes a specific component.

This step wraps a component, manages its inputs and outputs, and integrates it into the pipeline.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

component Component

The component to be executed in this step.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initializes a new ComponentStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
component Component

The component to be executed in this step.

required
input_state_map dict[str, str]

Mapping of component input arguments to pipeline state keys. Keys are input arguments expected by the component, values are corresponding state keys.

None
output_state str | list[str]) | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of component input arguments to runtime configuration keys. Keys are input arguments expected by the component, values are runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the component and processes its output.

This method validates inputs, prepares data, executes the component, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The update to the pipeline state after this step's operation, or None if output_state is None. When not None, this includes new or modified data produced by the component, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during component execution.

TimeoutError

If the component execution times out.

CancelledError

If the component execution is cancelled.

ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BranchingStep, HasInputsMixin

A conditional pipeline step that conditionally executes different branches based on specified conditions.

This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.

A minimal usage requires defining the branches to execute based on a condition, which is a callable that takes input from the state and returns a string identifying the branch to execute.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_map is used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. In this case, input_map is not used to build the payload and should not be passed.

Example:

ConditionalStep(
    name="UseCaseSelection",
    branches={"A": step_a, DEFAULT_BRANCH: step_b},
    condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)

This will execute step_a if the query contains "", and step_b otherwise.

The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches, the step will raise an error.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps.

condition list[ConditionType] | None

The condition(s) to evaluate for branch selection.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | None

Key to store the condition result in the state, if desired.

condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initializes a new ConditionalStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
condition ConditionType | list[ConditionType] | None

The condition(s) to evaluate for branch selection. If a Callable, it receives the merged state and config as keyword arguments. If None, the condition is evaluated from the state. Defaults to None.

None
input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None.

None
condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";").

lambda x: join(str(i) for i in x)
runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the conditional step, determines the route, and returns a Command.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
Command Command

A LangGraph Command object with 'goto' for routing and 'update' for state changes.

execute_direct(state, runtime) async

Execute this step directly, handling both branch selection and execution.

This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

select_path(state, runtime) async

Determines the logical route key based on the evaluated condition(s).

This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
str str

The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key.

EmptyStepErrorHandler(output_state)

Bases: BaseStepErrorHandler

Strategy that replace the current state of the output states to None on error.

Attributes:

Name Type Description
output_state list[str]

Output key(s) to map input values to.

Initialize the strategy with optional output state mapping.

Parameters:

Name Type Description Default
output_state str | list[str]

Output key(s) to map input values to. Can be a single string, list of strings.

required

FallbackStepErrorHandler(fallback)

Bases: BaseStepErrorHandler

Strategy that executes a fallback callable on error.

Attributes:

Name Type Description
fallback Callable[[Exception, dict[str, Any], Runtime[dict[str, Any] | BaseModel], ErrorContext], Any]

A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state.

Initialize the strategy with a fallback callable.

Parameters:

Name Type Description Default
fallback Callable[[Exception, dict[str, Any], Runtime[dict[str, Any] | BaseModel], ErrorContext], Any]

A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state.

required

GuardStep(name, condition, success_branch, failure_branch=None, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: ConditionalStep

A conditional step that can terminate pipeline execution if a condition is not met.

This step evaluates a condition and either: 1. Continues execution through the success_branch if the condition is True 2. Executes the failure_branch and terminates if the condition is False

Example
pipeline = (
    step_a
    | GuardStep(
        name="auth_check",
        condition=lambda x: x["is_authenticated"],
        success_branch=step_b,
        failure_branch=error_handling_step,
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

condition ConditionType

The condition to evaluate.

input_map dict[str, str | Any] | None

Unified input map.

success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately on False condition.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new GuardStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
condition ConditionType

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

KeepStepErrorHandler()

Bases: BaseStepErrorHandler

Strategy that preserves the current state on error.

Initialize the keep error handler.

LogStep(name, message, is_template=True, emit_kwargs=None, retry_config=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep

A specialized pipeline step for logging messages.

This step uses the Messenger component to log messages during pipeline execution. It supports both plain text messages and template messages with placeholders for state variables.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

messenger Messenger

The messenger component used to format and send messages.

emit_kwargs dict[str, Any]

Additional arguments to pass to the event emitter.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new LogStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
message str

The message to be logged, may contain placeholders enclosed in curly braces.

required
is_template bool

Whether the message contains placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional arguments to pass to the event emitter. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the log step by formatting and emitting the message.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

Raises:

Type Description
RuntimeError

If an error occurs during message emission.

MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_map dict[str, str | Any] | None

Unified input map.

output_state str

Key to store the reduced result in the pipeline state.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_state_map dict[str, str] | None

Mapping of function arguments to pipeline state keys. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. If provided, it will be used directly instead of synthesizing from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.

NoOpStep(name, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep

A step that does nothing.

This step is useful when you want to add a step that does not perform any processing. For example, you can use this step to implement a toggle pattern for a certain component.

Example:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "execute": step_b,
            "continue": NoOpStep("no_op")
        },
        condition=lambda x: "execute" if x["should_execute"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. The RetryConfig is automatically converted to LangGraph's RetryPolicy when needed for internal use. Note that timeout is not supported and will be ignored.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

The cache store to use for caching step results. Defaults to None. If None, no caching will be used.

None
cache_config dict[str, Any] | None

Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration.

None
Caching Mechanism

When a cache_store is provided, the step's execution method is automatically wrapped with a cache decorator. This means: 1. Before execution, the cache is checked for existing results based on input parameters 2. If a cached result exists and is valid, it's returned immediately 3. If no cached result exists, the step executes normally and the result is cached 4. Cache keys are generated from the step's input state and configuration 5. The cache name defaults to "step_{step_name}" if not specified

execute(state, runtime) async

Executes this step, which does nothing.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BranchingStep, HasInputsMixin

A pipeline step that executes multiple branches in parallel.

This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, PipelineSteps]

The branches to execute in parallel.

input_map dict[str, str | Val] | None

Unified input map.

squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initialize a new ParallelStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches list | dict[str, PipelineSteps]

The branches to execute in parallel. Can be either: List format: Each branch can be: 1. A single step 2. A list of steps to execute sequentially Example: [step1, [step2, step3], step4] Dict format: Keys are branch names, values can be: 1. A single step 2. A list of steps to execute sequentially Example: {"analysis": step1, "validation": [step2, step3], "cleanup": step4} Enables more intuitive step exclusion using branch names.

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

add_to_graph(graph, previous_endpoints, retry_policy=None)

Handle both squashed and expanded modes.

For squashed mode: add the parallel step as a single node. For expanded mode: add the parallel step as a single node and add children to graph.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

Endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Retry policy to propagate to child steps. Defaults to None, in which case the retry policy of the step is used.

None

Returns:

Type Description
list[str]

list[str]: Exit points after adding all child steps.

execute(state, runtime) async

Execute all branches in parallel and merge their results.

This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced.

Raises:

Type Description
CancelledError

If execution is cancelled, preserved with added context.

BaseInvokerError

If an error occurs during LM invocation.

RuntimeError

For all other exceptions during execution, wrapped with context information.

TimeoutError

If execution times out, preserved with added context.

ValidationError

If input validation fails.

RaiseStepErrorHandler()

Bases: BaseStepErrorHandler

Strategy that raises exceptions with enhanced context.

Initialize the raise error handler.

StateOperatorStep(name, input_states=None, output_state=None, operation=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that performs an operation on the pipeline state and updates it.

This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

operation Callable[[dict[str, Any]], Any]

The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new StateOperatorStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
input_states list[str]

Keys of the state data required by the operation.

None
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

None
operation Callable[[dict[str, Any]], Any]

The operation to execute. It should accept a dictionary of input data and return the operation result.

None
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the operation and processes its output.

This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during operation execution.

SubgraphStep(name, subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BaseCompositeStep, HasInputsMixin

A pipeline step that executes another pipeline as a subgraph.

This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

subgraph Pipeline

The pipeline to be executed as a subgraph.

input_map dict[str, str | Val] | None

Unified input map.

output_state_map dict[str, str]

Mapping of parent pipeline state keys to subgraph output keys.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new SubgraphStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_state_map dict[str, str]

Mapping of subgraph input keys to parent pipeline state keys. Defaults to None.

None
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using the SDK's RetryConfig. If None, no retry policy is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to be used for this step. If None, no error handler is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

apply_exclusions(exclusions)

Apply exclusions to this subgraph and its children.

Subgraph has no internal structural changes. It marks itself and uniformly propagates child exclusions to all subgraph steps.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime) async

Executes the subgraph and processes its output.

This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None.

Raises:

Type Description
RuntimeError

If an error occurs during subgraph execution, with details about which step caused the error.

TerminatorStep(name, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BasePipelineStep

A step that connects previous steps to the END node.

This step is useful when you want to explicitly terminate a branch or the entire pipeline. It has no processing logic and simply acts as a connection point to the END node.

Example:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "terminate": TerminatorStep("early_end"),
            "continue": step_b
        },
        condition=lambda x: "terminate" if x["should_stop"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. The RetryConfig is automatically converted to LangGraph's RetryPolicy when needed for internal use. Note that timeout is not supported and will be ignored.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

The cache store to use for caching step results. Defaults to None. If None, no caching will be used.

None
cache_config dict[str, Any] | None

Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration.

None
Caching Mechanism

When a cache_store is provided, the step's execution method is automatically wrapped with a cache decorator. This means: 1. Before execution, the cache is checked for existing results based on input parameters 2. If a cached result exists and is valid, it's returned immediately 3. If no cached result exists, the step executes normally and the result is cached 4. Cache keys are generated from the step's input state and configuration 5. The cache name defaults to "step_{step_name}" if not specified

add_to_graph(graph, previous_endpoints, retry_policy=None)

Adds this step to the graph and connects it to the END node.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy. If None, the retry policy of the step is used. If the step is not a retryable step, this parameter is ignored.

None

Returns:

Type Description
list[str]

list[str]: Empty list as this step has no endpoints (it terminates the flow).

execute(state, runtime) async

Executes this step, which does nothing but pass through the state.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime[dict[str, Any] | BaseModel]

The runtime information.

required

bundle(input_states, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a StateOperatorStep to combine multiple state keys.

This function creates a StateOperatorStep that combines multiple keys from the pipeline state into a single output without modifying the data.

Usage example
bundle_step = bundle(["input1", "input2"], "output")
# Produces: {"output": {"input1": state["input1"], "input2": state["input2"]}}

This will cause the step to bundle the values of input1 and input2 from the pipeline state into a single dictionary. The result will be stored in the pipeline state under the key output.

Usage example (with remapping):

# Provide a mapping of desired output field names to source state keys
# Renames state key "user_id" to "id" in the bundled output
bundle_step = bundle({"id": "user_id"}, "output")
# Produces: {"output": {"id": state["user_id"]}}

Parameters:

Name Type Description Default
input_states list[str] | dict[str, str]
  1. If a list is provided, the listed state keys are bundled as-is (identity mapping).
  2. If a dict is provided, it is treated as a mapping of output field names to source state keys. The bundled result will use the dict keys as field names.
required
output_state str | list[str]

Key(s) to store the bundled data in the pipeline state.

required
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Bundle" followed by a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured to bundle the input states.

guard(condition, success_branch, failure_branch=None, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a GuardStep with a concise syntax.

This function creates a GuardStep that can terminate pipeline execution if a condition is not met.

Usage example
auth_check = lambda state: state["is_authenticated"]
success_step = step(SuccessHandler(), {"input": "input"}, "output")
error_step = step(ErrorHandler(), {"error": "auth_error"}, "error_message")


guard_step = guard(
    auth_check,
    success_branch=success_step,
    failure_branch=error_step,
    input_map={"user_id": "current_user", "model": "auth_model", "strict_mode": Val(True)},
    output_state="auth_result",
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
guard_step = guard(
    auth_check,
    success_branch=success_step,
    failure_branch=error_step,
    input_state_map={"user_id": "current_user"},
    runtime_config_map={"model": "auth_model"},
    fixed_args={"strict_mode": True},
    output_state="auth_result"
)

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Guard" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
GuardStep GuardStep

An instance of GuardStep configured with the provided parameters.

if_else(condition, if_branch, else_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a simple ConditionalStep with two branches.

This function creates a ConditionalStep that executes one of two branches based on a condition.

The condition can be either: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false")

For boolean conditions and string conditions, True/true/TRUE maps to the if_branch and False/false/FALSE maps to the else_branch.

Usage example with a Callable condition
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["input"] > data["threshold"]

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    condition,
    if_branch,
    else_branch,
    output_state="condition_result",
    input_map={"threshold": Val(0)}
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
if_else_step = if_else(
    condition,
    if_branch,
    else_branch,
    output_state="condition_result",
    fixed_args={"threshold": 0}
)

This will cause the step to execute the PositiveComponent if the input in the pipeline state is greater than the threshold (0), and the NegativeComponent otherwise. The result of the condition will be stored in the pipeline state under the key condition_result.

Usage example with a Component condition
# Using a Component condition - requires input_state_map and runtime_config_map
threshold_checker = ThresholdChecker()  # A Component that returns "true" or "false"

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    threshold_checker,
    if_branch,
    else_branch,
    output_state="condition_result",
    input_map={
        "value": "input",
        "threshold": "threshold_config",
        "strict_mode": Val(True),
    }
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
if_else_step = if_else(
    threshold_checker,
    if_branch,
    else_branch,
    input_state_map={"value": "input"},
    output_state="condition_result",
    runtime_config_map={"threshold": "threshold_config"},
    fixed_args={"strict_mode": True}
)

This will cause the step to execute the ThresholdChecker component with the input from the pipeline state as its value parameter and the threshold_config from runtime configuration as its threshold parameter. Based on the component's result ("true" or "false"), it will execute either the PositiveComponent or the NegativeComponent.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
else_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is false.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. directly; otherwise it will be synthesized from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "IfElse" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache_store=None, cache_config=None)

Create a specialized step for logging messages.

This function creates a LogStep that logs messages within a pipeline. It can be used to log status updates, debugging information, or any other text during pipeline execution.

The message can be a plain string or a template with placeholders for state variables.

Usage example 1 (plain message):

log_step = log("Processing document", is_template=False)

Usage example 2 (template message with state variables):

log_step = log("Processing query: {query} with model: {model_name}")

Parameters:

Name Type Description Default
message str

The message to be logged. May contain placeholders in curly braces for state variables.

required
is_template bool

Whether the message is a template with placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional keyword arguments to pass to the event emitter. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. If None, a name will be auto-generated with the prefix "log_". Defaults to None.

None

Returns:

Name Type Description
LogStep LogStep

A specialized pipeline step for logging messages.

map_reduce(output_state, map_func, input_state_map=None, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a MapReduceStep that maps a function over multiple inputs and reduces the results.

This function creates a step that applies a mapping function to multiple inputs in parallel and combines the results using a reduction function.

The map_func receives a dictionary for each item being processed. This dictionary contains: 1. Values from input_state_map (with list inputs split into individual items). 2. Values from runtime_config_map (if provided). 3. Values from fixed_args (if provided).

The map_func can be either: 1. A callable function that takes a dictionary as input and returns a result. 2. A Component instance, which will be executed with proper async handling.

Important note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

The step supports automatic broadcasting of scalar values and handles lists appropriately: 1. If multiple list inputs are provided, they must be the same length. 2. Scalar inputs are broadcasted to match list lengths.

Usage Example - Processing a list of items with an async map function
async def count_words(item):
    await asyncio.sleep(0.1)  # Simulate I/O operation
    return len(item["document"].split())

process_docs = map_reduce(
    input_state_map={
        "document": "documents" # A list, e.g. ["doc1...", "doc2...", "doc3..."]
    },
    output_state="word_counts", # A list of word counts for each document
    map_func=count_words,
    reduce_func=lambda results: sum(results), # Sum word counts
)

# When executed with {"documents": ["doc1...", "doc2...", "doc3..."]},
# returns {"word_counts": 60} (total word count)
Usage Example - Broadcasting scalar values to match list length
# Apply a common threshold to multiple values
threshold_check = map_reduce(
    input_state_map={
        "value": "values",          # A list: [5, 10, 15]
        "threshold": "threshold",   # A scalar: 8 (will be broadcast)
    },
    output_state="above_threshold",
    map_func=lambda item: item["value"] > item["threshold"],
    reduce_func=lambda results: results  # Return list of boolean results
)
# When executed with {"values": [5, 10, 15], "threshold": 8},
# returns {"above_threshold": [False, True, True]}
Usage Example - Multiple list inputs with the same length
similarity_step = map_reduce(
    input_state_map={
        "doc1": "documents_a",  # ["doc1", "doc2", "doc3"]
        "doc2": "documents_b",  # ["docA", "docB", "docC"]
    },
    output_state="similarity_scores",
    map_func=lambda item: calculate_similarity(item["doc1"], item["doc2"]),
    reduce_func=lambda results: sum(results) / len(results)  # Average similarity
)
# When executed with {"documents_a": ["doc1", "doc2", "doc3"], "documents_b": ["docA", "docB", "docC"]},
# returns {"similarity_scores": 0.75}
Usage Example - Using a Component for complex processing instead of a map function
summarizer = TextSummarizer() # Subclass of Component
summarize_step = map_reduce(
    input_state_map={
        "text": "documents",            # List of documents to summarize
        "max_length": "max_length",     # Scalar parameter (broadcasted)
    },
    output_state="summaries",
    map_func=summarizer,
    reduce_func=lambda results: [r["summary"] for r in results]
)
# When executed with {"documents": ["doc1...", "doc2..."], "max_length": 50},
# returns {"summaries": ["summary1...", "summary2..."]}

Parameters:

Name Type Description Default
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_state_map dict[str, str] | None

Mapping of function arguments to pipeline state keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this step. Defaults to None, in which case the name will be "MapReduce" followed by the map function name.

None

Returns:

Name Type Description
MapReduceStep MapReduceStep

An instance of MapReduceStep configured with the provided parameters.

no_op(name=None)

Create a NoOpStep to add a step that does nothing.

This function creates a PassThroughStep that does nothing.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "NoOp" followed by a unique identifier.

None

Returns:

Name Type Description
NoOpStep NoOpStep

An instance of NoOpStep.

parallel(branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, input_map=None, error_handler=None, cache_store=None, cache_config=None, retry_config=None, name=None)

Create a ParallelStep that executes multiple branches concurrently.

This function creates a ParallelStep that runs multiple branches in parallel and merges their results. Each branch can be a single step or a list of steps to execute sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Usage example: 1. Define branches as a list of steps or lists of steps python parallel_step = parallel( branches=[ step(ComponentA(), {"input": "query"}, "output_a"), [ step(ComponentB1(), {"input": "query"}, "output_b1"), step(ComponentB2(), {"input": "output_b1"}, "output_b2") ], step(ComponentC(), {"input": "query"}, "output_c") ], input_states=["query"], # Only 'query' will be passed to branches )

2. Define branches as a dictionary of branches
Other than the list format, we can also use the dictionary format for branches to
make it easier to exclude branches.
```python
parallel_step = parallel(
    branches={
        "branch_a": step(ComponentA(), {"input": "query"}, "output_a"),
        "branch_b": step(ComponentB(), {"input": "query"}, "output_b"),
    },
    input_states=["query"],
)
```

Parameters:

Name Type Description Default
input_states list[str] | None

Keys from the state to pass to branches. Defaults to None, in which case all state keys will be passed Will be deprecated in v0.5. Please use input_map instead.

None
squash bool

Whether to squash execution into a single node. If True, uses asyncio.gather() to run branches in parallel. If False, uses native LangGraph structures for parallelism. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to include in the state passed to branches. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this parallel step. Defaults to None. In this case, a name will be auto-generated.

None

Returns:

Name Type Description
ParallelStep ParallelStep

An instance of ParallelStep configured with the provided branches.

step(component, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, emittable=True, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)

Create a ComponentStep with a concise syntax.

This function creates a ComponentStep, which wraps a component and manages its inputs and outputs within the pipeline.

Usage example

We can leverage the input_map parameter to specify both state/config keys (as strings) and fixed values (as any type) in a single dictionary.

retriever = Retriever()
retriever_step = step(
    retriever,
    input_map={
        "query": "user_input",
        "top_k": "config_top_k",
        "conversation_id": "Val(<fixed_value>)",
    }
    output_state="retrieved_data",
)

This will cause the step to execute the Retriever component with the following behavior: 1. It will pass the user_input from the pipeline state to the query argument of the Retriever. 2. It will pass the config_top_k from the runtime configuration to the top_k argument of the Retriever. 3. It will pass the <fixed_value> from the conversation_id argument of the Retriever. 4. It will store the retrieved_data from the Retriever result in the pipeline state.

Legacy Approach (will be deprecated in v0.5, please use input_map instead): python retriever = Retriever() retriever_step = step(retriever, {"query": "input_query"}, "retrieved_data") This will cause the step to execute the Retriever component with the following behavior: 1. It will pass the input_query from the pipeline state to the query argument of the Retriever. 2. It will store the retrieved_data from the Retriever result in the pipeline state.

Parameters:

Name Type Description Default
component Component

The component to be executed in this step.

required
input_state_map dict[str, str] | None

Mapping of component input arguments to pipeline state keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of component arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
emittable bool

Whether an event emitter should be passed to the component, if available in the state and not explicitly provided in any of the arguments. Defaults to True.

True
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be the component's class name followed by a unique identifier.

None

Returns:

Name Type Description
ComponentStep ComponentStep

An instance of ComponentStep configured with the provided parameters.

subgraph(subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a SubgraphStep that executes another pipeline as a subgraph.

This function creates a SubgraphStep that allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

The SubgraphStep gracefully handles missing state keys - if a key specified in input_state_map is not present in the parent state, it will be omitted from the subgraph input rather than causing an error. This allows for flexible composition of pipelines with different state schemas.

Usage example
from typing import TypedDict
from gllm_pipeline.pipeline.pipeline import Pipeline

# Define state schemas using TypedDict
class SubgraphState(TypedDict):
    query: str
    retrieved_data: list
    reranked_data: list

class ParentState(TypedDict):
    user_input: str
    query: str
    reranked: list
    response: str

# Define a subgraph pipeline with its own state schema
subgraph_pipeline = Pipeline(
    [
        step(Retriever(), {"query": "query"}, "retrieved_data"),
        step(Reranker(), {"data": "retrieved_data"}, "reranked_data")
    ],
    state_type=SubgraphState
)

# Use the subgraph in a parent pipeline
parent_pipeline = Pipeline(
    [
        step(QueryProcessor(), {"input": "user_input"}, "query"),
        subgraph(
            subgraph_pipeline,
            input_map={"query": "query", "model": "retrieval_model", "top_k": Val(10)},
            output_state_map={"reranked": "reranked_data"},
        ),
        step(ResponseGenerator(), {"data": "reranked"}, "response")
    ],
    state_type=ParentState
)

# or use the legacy approach via input_state_map, output_state_map, runtime_config_map, and fixed_args
parent_pipeline = Pipeline(
    [
        step(QueryProcessor(), {"input": "user_input"}, "query"),
        subgraph(
            subgraph_pipeline,
            input_state_map={"query": "query"},  # Map parent state to subgraph input
            output_state_map={"reranked": "reranked_data"},  # Map subgraph output to parent state
            runtime_config_map={"model": "retrieval_model"},
            fixed_args={"top_k": 10},
        ),
        step(ResponseGenerator(), {"data": "reranked"}, "response")
    ],
    state_type=ParentState
)

# When the parent pipeline runs:
# 1. QueryProcessor processes user_input and produces query
# 2. SubgraphStep creates a new state for the subgraph with query from parent
# 3. Subgraph executes its steps (Retriever → Reranker)
# 4. SubgraphStep maps reranked_data from subgraph to reranked in parent
# 5. ResponseGenerator uses reranked to produce response

Parameters:

Name Type Description Default
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_state_map dict[str, str] | None

Mapping of subgraph input keys to parent pipeline state keys. Keys that don't exist in the parent state will be gracefully ignored. If None, all subgraph inputs will be passed as-is. Will be deprecated in v0.5. Please use input_map instead.

None
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. If None, all subgraph outputs will be passed as-is.

None
runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used.Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec | None

Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

None

Returns:

Name Type Description
SubgraphStep SubgraphStep

An instance of SubgraphStep configured with the provided parameters.

switch(condition, branches, input_state_map=None, output_state=None, default=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a ConditionalStep with multiple branches.

This function creates a ConditionalStep that can execute one of multiple branches based on a condition.

Usage example with a Callable condition
# Using a Callable condition - receives merged state and config directly
def extract_command(data):
    # Access both state and config in a single dictionary
    query = data["query"]
    separator = data["separator"]  # From runtime config or state
    return query.split(separator)[0]

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
}
default = step(NoOpComponent(), {}, "no_op_result")

switch_step = switch(
    extract_command,
    branches,
    input_map={"separator": Val(" ")}
    output_state="command_type",
    default=default,
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
switch_step = switch(
    extract_command,
    branches,
    # input_state_map and runtime_config_map are ignored for Callable conditions
    # but can still be specified (they will have no effect)
    output_state="command_type",
    default=default,
    fixed_args={"separator": " "}  # This will be merged with state and config
)

This will cause the step to execute the SearchComponent if the first part of the query in the pipeline state is "search", the FilterComponent if it is "filter", and the NoOpComponent otherwise. The separator is provided as a fixed argument. The result of the condition will be stored in the pipeline state under the key command_type.

Usage example with a Component condition
# Using a Component condition - requires input_state_map and runtime_config_map
command_extractor = CommandExtractor()  # A Component that extracts command from query

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
    "sort": step(SortComponent(), {"query": "query"}, "sort_result"),
}
default = step(DefaultComponent(), {"query": "query"}, "default_result")

switch_step = switch(
    command_extractor,
    branches,
    input_map={"text": "query", "delimiter": "separator_config", "lowercase": Val(True)},
    output_state="command_type",
    default=default,
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
switch_step = switch(
    command_extractor,
    branches,
    input_state_map={"text": "query"},  # Maps pipeline state to component input
    output_state="command_type",
    default=default,
    runtime_config_map={"delimiter": "separator_config"},  # Maps runtime config to component input
    fixed_args={"lowercase": True}  # Fixed arguments passed directly to component
)

This will cause the step to execute the CommandExtractor component with the query from the pipeline state as its text parameter and the separator_config from runtime configuration as its delimiter parameter. Based on the component's result (which should be one of "search", "filter", "sort", or something else), it will execute the corresponding branch component or the default component.

Parameters:

Name Type Description Default
condition ConditionType

The condition to evaluate for branch selection.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
default BasePipelineStep | list[BasePipelineStep] | None

Default branch to execute if no condition matches. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Switch" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

terminate(name=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Create a TerminatorStep to end pipeline execution.

This function creates a TerminatorStep that explicitly terminates a branch or the entire pipeline.

Usage example
early_exit = terminate("early_exit")

pipeline = (
    step_a
    | if_else(should_stop, early_exit, step_b)
    | step_c
)

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Terminator" followed by a unique identifier.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
TerminatorStep TerminatorStep

An instance of TerminatorStep.

toggle(condition, if_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a ConditionalStep that toggles between a branch and a no-op.

This function creates a ConditionalStep that executes a branch if the condition is true, and does nothing (no-op) if the condition is false.

The condition can be: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false") 4. A string key that will be looked up in the merged state data (state + runtime config + fixed args). The value will be evaluated for truthiness - any non-empty, non-zero, non-False value will be considered True.

Usage example with a Callable condition
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["feature_enabled"] and data["user_tier"] >= 2
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    condition,
    feature_step,
    output_state="feature_status",
    input_map={"user_tier": Val(2)},
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
toggle_step = toggle(
    condition,
    feature_step,
    output_state="feature_status",
    fixed_args={"user_tier": 2}  # This will be merged with state and config
)

This will execute the FeatureComponent only if both feature_enabled is true and user_tier is at least 2. Otherwise, it will do nothing. The condition result will be stored in the pipeline state under the key feature_status.

Usage example with a Component condition
# Using a Component condition - requires input_state_map and runtime_config_map
feature_checker = FeatureChecker()  # A Component that returns "true" or "false"
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    feature_checker,
    feature_step,
    output_state="feature_status",
    input_map={"user_id": "current_user", "feature_name": "target_feature", "check_permissions": Val(True)},
)

# or use the legacy approach via input_state_map, runtime_config_map, and fixed_args
Note: this approach is deprecated in v0.5. Please use input_map instead.
toggle_step = toggle(
    feature_checker,
    feature_step,
    input_state_map={"user_id": "current_user"},  # Maps pipeline state to component input
    output_state="feature_status",
    runtime_config_map={"feature_name": "target_feature"},  # Maps runtime config to component input
    fixed_args={"check_permissions": True}  # Fixed arguments passed directly to component
)

This will cause the step to execute the FeatureChecker component with the current_user from the pipeline state as its user_id parameter and the target_feature from runtime configuration as its feature_name parameter. Based on the component's result ("true" or "false"), it will either execute the FeatureComponent or do nothing.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool] | str

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec | None

Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Toggle" followed by a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

transform(operation, input_states=None, output_state=None, runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a StateOperatorStep for transforming state data.

This function creates a StateOperatorStep that applies a transformation operation to the pipeline state. Note that the function operation should accept a dictionary of input data and return the operation result.

Usage example
def sort(data: dict) -> dict:
    is_reverse = data["reverse"]
    data["chunk"] = sorted(data["chunk"], reverse=is_reverse)

transform_step = transform(
    operation=sort,
    input_map=["chunk", {"reverse": Val(True)}],
    output_state="output",
)

# or use the legacy approach via input_states, runtime_config_map, and fixed_args
transform_step = transform(
    operation=sort,
    input_states=["chunk"],
    output_state="output",
    runtime_config_map={"reverse": "reverse_config"},
    fixed_args={"reverse": True},
)

This will cause the step to execute the sort operation on the chunk in the pipeline state. The result will be stored in the pipeline state under the key output. The behavior is controlled by the runtime configuration key reverse.

Parameters:

Name Type Description Default
operation Callable[[dict[str, Any]], Any]

The operation to execute on the input data.

required
input_states list[str] | None

List of input state keys required by the operation. Defaults to None. Will be deprecated in v0.5. Please use input_map instead. Will be deprecated in v0.5. Please use input_map instead.

None
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

None
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None. Will be deprecated in v0.5. Please use input_map instead.

None
input_map InputMapSpec

Direct unified input map. If provided, input_state_map, runtime_config_map, and fixed_args will be ignored; otherwise it will be synthesized from the input_state_map, runtime_config_map, and fixed_args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Transform" followed by the operation's function name and a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured with the provided parameters.