Skip to content

Steps

Contains the base classes and implementations of pipeline steps.

ComponentStep(name, component, input_state_map, output_state=None, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes a specific component.

This step wraps a component, manages its inputs and outputs, and integrates it into the pipeline.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

component Component

The component to be executed in this step.

input_state_map dict[str, str]

Mapping of component input arguments to pipeline state keys.

output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed.

runtime_config_map dict[str, str] | None

Mapping of component input arguments to runtime configuration keys.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component.

Initializes a new ComponentStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
component Component

The component to be executed in this step.

required
input_state_map dict[str, str]

Mapping of component input arguments to pipeline state keys. Keys are input arguments expected by the component, values are corresponding state keys.

required
output_state str | list[str]) | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of component input arguments to runtime configuration keys. Keys are input arguments expected by the component, values are runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None.

None

execute(state, config) async

Executes the component and processes its output.

This method validates inputs, prepares data, executes the component, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The update to the pipeline state after this step's operation, or None if output_state is None. When not None, this includes new or modified data produced by the component, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during component execution.

TimeoutError

If the component execution times out.

CancelledError

If the component execution is cancelled.

ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A conditional pipeline step that conditionally executes different branches based on specified conditions.

This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.

A minimal usage requires defining the branches to execute based on a condition, which is a callable that takes input from the state and returns a string identifying the branch to execute.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_state_map and runtime_config_map are used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. input_state_map and runtime_config_map are ignored in this case.

Example:

ConditionalStep(
    name="UseCaseSelection",
    branches={"A": step_a, DEFAULT_BRANCH: step_b},
    condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)

This will execute step_a if the query contains "", and step_b otherwise.

The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches, the step will raise an error.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps.

condition list[ConditionType] | None

The condition(s) to evaluate for branch selection.

input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

output_state str | None

Key to store the condition result in the state, if desired.

condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results.

runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition evaluation.

Initializes a new ConditionalStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
condition ConditionType | list[ConditionType] | None

The condition(s) to evaluate for branch selection. If a Callable, it receives the merged state and config as keyword arguments. If None, the condition is evaluated from the state. Defaults to None.

None
input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None.

None
condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";").

lambda x: join(str(i) for i in x)
runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None

add_to_graph(graph, previous_endpoints)

Integrates this step into the pipeline's internal structure.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

This method always creates a condition node that: 1. Executes the condition and stores its result if output_state is defined 2. Acts as a pass-through router if no output_state is defined 3. Connects to the appropriate branch based on condition evaluation

Parameters:

Name Type Description Default
graph StateGraph

The internal representation of the pipeline structure.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: The exit points (endpoints) of all non-terminating branches.

execute(state, config) async

Executes the conditional step, determines the route, and returns a Command.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
Command Command

A LangGraph Command object with 'goto' for routing and 'update' for state changes.

execute_direct(state, config) async

Execute this step directly, handling both branch selection and execution.

This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

get_all_steps()

Gets all steps from all branches.

Returns:

Type Description
list[BasePipelineStep]

list[BasePipelineStep]: A list of all steps in all branches.

get_mermaid_diagram()

Create a Mermaid diagram representation of the conditional step.

Returns:

Name Type Description
str str

The complete Mermaid diagram representation of the conditional branches.

select_path(state, config) async

Determines the logical route key based on the evaluated condition(s).

This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
str str

The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key.

GuardStep(name, condition, success_branch, failure_branch=None, **kwargs)

Bases: ConditionalStep

A conditional step that can terminate pipeline execution if a condition is not met.

This step evaluates a condition and either: 1. Continues execution through the success_branch if the condition is True 2. Executes the failure_branch and terminates if the condition is False

Example:

pipeline = (
    step_a
    | GuardStep(
        name="auth_check",
        condition=lambda x: x["is_authenticated"],
        success_branch=step_b,
        failure_branch=error_handling_step,
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

condition ConditionType

The condition to evaluate.

success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately on False condition.

Initializes a new GuardStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
condition ConditionType

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
**kwargs Any

Additional arguments to pass to ConditionalStep.

{}

LogStep(name, message, is_template=True, emit_kwargs=None)

Bases: BasePipelineStep

A specialized pipeline step for logging messages.

This step uses the Messenger component to log messages during pipeline execution. It supports both plain text messages and template messages with placeholders for state variables.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

messenger Messenger

The messenger component used to format and send messages.

emit_kwargs dict[str, Any]

Additional arguments to pass to the event emitter.

Initializes a new LogStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
message str

The message to be logged, may contain placeholders enclosed in curly braces.

required
is_template bool

Whether the message contains placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional arguments to pass to the event emitter. Defaults to None.

None

execute(state, config) async

Executes the log step by formatting and emitting the message.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

Raises:

Type Description
RuntimeError

If an error occurs during message emission.

MapReduceStep(name, input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_state_map dict[str, str]

Mapping of function arguments to pipeline state keys.

output_state str

Key to store the reduced result in the pipeline state.

runtime_config_map dict[str, str] | None

Mapping of function arguments to runtime config keys.

fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
input_state_map dict[str, str]

Mapping of function arguments to pipeline state keys.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions. Defaults to None.

None

execute(state, config) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.

NoOpStep(name)

Bases: BasePipelineStep

A step that does nothing.

This step is useful when you want to add a step that does not perform any processing. For example, you can use this step to implement a toggle pattern for a certain component.

Example:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "execute": step_b,
            "continue": NoOpStep("no_op")
        },
        condition=lambda x: "execute" if x["should_execute"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required

execute(state, config) async

Executes this step, which does nothing.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes multiple branches in parallel.

This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches list[BasePipelineStep | list[BasePipelineStep]]]

The branches to execute in parallel.

squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes.

input_states list[str] | None

Keys from the state that should be passed to branches. If None, all state keys will be passed.

Initialize a new ParallelStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches list[BasePipelineStep | list[BasePipelineStep]]

The branches to execute in parallel. Each branch can be: 1. A single step 2. A list of steps to execute sequentially

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used.

None

add_to_graph(graph, previous_endpoints)

Add this step to the graph.

Uses either squashed or expanded approach based on the squash parameter.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: The endpoint(s) of this step.

execute(state, config) async

Execute all branches in parallel and merge their results.

This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced.

Raises:

Type Description
CancelledError

If execution is cancelled, preserved with added context.

BaseInvokerError

If an error occurs during LM invocation.

RuntimeError

For all other exceptions during execution, wrapped with context information.

TimeoutError

If execution times out, preserved with added context.

ValidationError

If input validation fails.

get_mermaid_diagram()

Combines the base diagram and nested step diagrams into a complete visualization.

If the parallel step is squashed, a custom logic is used to represent the structure. If the parallel step is not squashed, the structure is created natively using LangGraph nodes.

Returns:

Name Type Description
str str

Complete Mermaid diagram for this parallel step.

StateOperatorStep(name, input_states, output_state, operation, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that performs an operation on the pipeline state and updates it.

This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

input_states list[str]

Keys of the state data required by the operation.

output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

operation Callable[[dict[str, Any]], Any]

The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration.

runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation.

Initializes a new StateOperatorStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
input_states list[str]

Keys of the state data required by the operation.

required
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

required
operation Callable[[dict[str, Any]], Any]

The operation to execute. It should accept a dictionary of input data and return the operation result.

required
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None.

None

execute(state, config) async

Executes the operation and processes its output.

This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during operation execution.

SubgraphStep(name, subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes another pipeline as a subgraph.

This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

subgraph Pipeline

The pipeline to be executed as a subgraph.

input_state_map dict[str, str]

Mapping of subgraph input keys to parent pipeline state keys.

output_state_map dict[str, str]

Mapping of parent pipeline state keys to subgraph output keys.

runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph.

Initializes a new SubgraphStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_state_map dict[str, str]

Mapping of subgraph input keys to parent pipeline state keys.

None
output_state_map dict[str, str]

Mapping of parent pipeline state keys to subgraph output keys.

None
runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used.

None

execute(state, config) async

Executes the subgraph and processes its output.

This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None.

Raises:

Type Description
RuntimeError

If an error occurs during subgraph execution, with details about which step caused the error.

get_mermaid_diagram()

Create a Mermaid diagram representation of the subgraph.

Returns:

Name Type Description
str str

The complete Mermaid diagram representation of the subgraph.

TerminatorStep(name)

Bases: BasePipelineStep

A step that connects previous steps to the END node.

This step is useful when you want to explicitly terminate a branch or the entire pipeline. It has no processing logic and simply acts as a connection point to the END node.

Example:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "terminate": TerminatorStep("early_end"),
            "continue": step_b
        },
        condition=lambda x: "terminate" if x["should_stop"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required

add_to_graph(graph, previous_endpoints)

Adds this step to the graph and connects it to the END node.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: Empty list as this step has no endpoints (it terminates the flow).

execute(state, config) async

Executes this step, which does nothing but pass through the state.

Parameters:

Name Type Description Default
state dict[str, Any]

The current pipeline state.

required
config RunnableConfig

The runtime configuration.

required

bundle(input_states, output_state, name=None)

Create a StateOperatorStep to combine multiple state keys.

This function creates a StateOperatorStep that combines multiple keys from the pipeline state into a single output without modifying the data.

Usage example:

bundle_step = bundle(["input1", "input2"], "output")

This will cause the step to bundle the values of input1 and input2 from the pipeline state into a single dictionary. The result will be stored in the pipeline state under the key output.

Parameters:

Name Type Description Default
input_states list[str]

List of input state keys to be bundled.

required
output_state str | list[str]

Key(s) to store the bundled data in the pipeline state.

required
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Bundle" followed by a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured to bundle the input states.

guard(condition, success_branch, failure_branch=None, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)

Create a GuardStep with a concise syntax.

This function creates a GuardStep that can terminate pipeline execution if a condition is not met.

Usage example:

auth_check = lambda state: state["is_authenticated"]
success_step = step(SuccessHandler(), {"input": "input"}, "output")
error_step = step(ErrorHandler(), {"error": "auth_error"}, "error_message")

guard_step = guard(
    auth_check,
    success_branch=success_step,
    failure_branch=error_step,
    input_state_map={"user_id": "current_user"},
    output_state="auth_result"
)

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Guard" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
GuardStep GuardStep

An instance of GuardStep configured with the provided parameters.

if_else(condition, if_branch, else_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)

Create a simple ConditionalStep with two branches.

This function creates a ConditionalStep that executes one of two branches based on a condition.

The condition can be either: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false")

For boolean conditions and string conditions, True/true/TRUE maps to the if_branch and False/false/FALSE maps to the else_branch.

Usage example with a Callable condition:

# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["input"] > data["threshold"]

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    condition,
    if_branch,
    else_branch,
    # input_state_map and runtime_config_map are ignored for Callable conditions,
    # but can still be specified (they will have no effect)
    output_state="condition_result",
    fixed_args={"threshold": 0}  # This will be merged with state and config
)

This will cause the step to execute the PositiveComponent if the input in the pipeline state is greater than the threshold (0), and the NegativeComponent otherwise. The result of the condition will be stored in the pipeline state under the key condition_result.

Usage example with a Component condition:

# Using a Component condition - requires input_state_map and runtime_config_map
threshold_checker = ThresholdChecker()  # A Component that returns "true" or "false"

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    threshold_checker,
    if_branch,
    else_branch,
    input_state_map={"value": "input"},  # Maps pipeline state to component input
    output_state="condition_result",
    runtime_config_map={"threshold": "threshold_config"},  # Maps runtime config to component input
    fixed_args={"strict_mode": True}  # Fixed arguments passed directly to component
)

This will cause the step to execute the ThresholdChecker component with the input from the pipeline state as its value parameter and the threshold_config from runtime configuration as its threshold parameter. Based on the component's result ("true" or "false"), it will execute either the PositiveComponent or the NegativeComponent.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
else_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is false.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "IfElse" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

log(message, is_template=True, emit_kwargs=None, name=None)

Create a specialized step for logging messages.

This function creates a LogStep that logs messages within a pipeline. It can be used to log status updates, debugging information, or any other text during pipeline execution.

The message can be a plain string or a template with placeholders for state variables.

Usage example 1 (plain message):

log_step = log("Processing document", is_template=False)

Usage example 2 (template message with state variables):

log_step = log("Processing query: {query} with model: {model_name}")

Parameters:

Name Type Description Default
message str

The message to be logged. May contain placeholders in curly braces for state variables.

required
is_template bool

Whether the message is a template with placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional keyword arguments to pass to the event emitter. Defaults to None.

None
name str | None

A unique identifier for this pipeline step. If None, a name will be auto-generated with the prefix "log_". Defaults to None.

None

Returns:

Name Type Description
LogStep LogStep

A specialized pipeline step for logging messages.

map_reduce(input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None, name=None)

Create a MapReduceStep that maps a function over multiple inputs and reduces the results.

This function creates a step that applies a mapping function to multiple inputs in parallel and combines the results using a reduction function.

The map_func receives a dictionary for each item being processed. This dictionary contains: 1. Values from input_state_map (with list inputs split into individual items). 2. Values from runtime_config_map (if provided). 3. Values from fixed_args (if provided).

The map_func can be either: - A callable function that takes a dictionary as input and returns a result. - A Component instance, which will be executed with proper async handling.

Important note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

The step supports automatic broadcasting of scalar values and handles lists appropriately: 1. If multiple list inputs are provided, they must be the same length. 2. Scalar inputs are broadcasted to match list lengths.

Usage Examples: Example 1: Processing a list of items with an async map function.

async def count_words(item):
    await asyncio.sleep(0.1)  # Simulate I/O operation
    return len(item["document"].split())

process_docs = map_reduce(
    input_state_map={
        "document": "documents" # A list, e.g. ["doc1...", "doc2...", "doc3..."]
    },
    output_state="word_counts", # A list of word counts for each document
    map_func=count_words,
    reduce_func=lambda results: sum(results), # Sum word counts
)

# When executed with {"documents": ["doc1...", "doc2...", "doc3..."]},
# returns {"word_counts": 60} (total word count)

Example 2: Broadcasting scalar values to match list length

# Apply a common threshold to multiple values
threshold_check = map_reduce(
    input_state_map={
        "value": "values",          # A list: [5, 10, 15]
        "threshold": "threshold",   # A scalar: 8 (will be broadcast)
    },
    output_state="above_threshold",
    map_func=lambda item: item["value"] > item["threshold"],
    reduce_func=lambda results: results  # Return list of boolean results
)
# When executed with {"values": [5, 10, 15], "threshold": 8},
# returns {"above_threshold": [False, True, True]}

Example 3: Multiple list inputs with the same length

similarity_step = map_reduce(
    input_state_map={
        "doc1": "documents_a",  # ["doc1", "doc2", "doc3"]
        "doc2": "documents_b",  # ["docA", "docB", "docC"]
    },
    output_state="similarity_scores",
    map_func=lambda item: calculate_similarity(item["doc1"], item["doc2"]),
    reduce_func=lambda results: sum(results) / len(results)  # Average similarity
)
# When executed with {"documents_a": ["doc1", "doc2", "doc3"], "documents_b": ["docA", "docB", "docC"]},
# returns {"similarity_scores": 0.75}

Example 4: Using a Component for complex processing instead of a map function

summarizer = TextSummarizer() # Subclass of Component
summarize_step = map_reduce(
    input_state_map={
        "text": "documents",            # List of documents to summarize
        "max_length": "max_length",     # Scalar parameter (broadcasted)
    },
    output_state="summaries",
    map_func=summarizer,
    reduce_func=lambda results: [r["summary"] for r in results]
)
# When executed with {"documents": ["doc1...", "doc2..."], "max_length": 50},
# returns {"summaries": ["summary1...", "summary2..."]}

Parameters:

Name Type Description Default
input_state_map dict[str, str]

Mapping of function arguments to pipeline state keys.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
runtime_config_map dict[str, str] | None

Mapping of arguments to runtime config keys.

None
fixed_args dict[str, Any] | None

Fixed arguments to pass to the functions.

None
name str | None

A unique identifier for this step. Defaults to None, in which case the name will be "MapReduce" followed by the map function name.

None

Returns:

Name Type Description
MapReduceStep MapReduceStep

An instance of MapReduceStep configured with the provided parameters.

no_op(name=None)

Create a NoOpStep to add a step that does nothing.

This function creates a PassThroughStep that does nothing.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "NoOp" followed by a unique identifier.

None

Returns:

Name Type Description
NoOpStep NoOpStep

An instance of NoOpStep.

parallel(branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, name=None)

Create a ParallelStep that executes multiple branches concurrently.

This function creates a ParallelStep that runs multiple branches in parallel and merges their results. Each branch can be a single step or a list of steps to execute sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Usage example:

parallel_step = parallel(
    branches=[
        step(ComponentA(), {"input": "query"}, "output_a"),
        [
            step(ComponentB1(), {"input": "query"}, "output_b1"),
            step(ComponentB2(), {"input": "output_b1"}, "output_b2")
        ],
        step(ComponentC(), {"input": "query"}, "output_c")
    ],
    input_states=["query"],  # Only 'query' will be passed to branches
)

Important note about input_states: 1. If input_states is provided, only the keys listed will be passed to the branches 2. If input_states is None (default), all state keys will be passed to the branches 3. Each branch is responsible for its own state management once execution begins 4. For sequential branches (lists of steps), later steps in the branch can still access outputs from earlier steps in the same branch

Parameters:

Name Type Description Default
branches list[BasePipelineStep | list[BasePipelineStep]]

List of branches to execute in parallel. Each branch can be a single step or a list of steps to execute sequentially.

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. If True, uses asyncio.gather() to run branches in parallel. If False, uses native LangGraph structures for parallelism. Defaults to True.

True
runtime_config_map dict[str, str] | None

Mapping of input keys to runtime config keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to include in the state passed to branches. Defaults to None.

None
name str | None

A unique identifier for this parallel step. Defaults to None. In this case, a name will be auto-generated.

None

Returns:

Name Type Description
ParallelStep ParallelStep

An instance of ParallelStep configured with the provided branches.

step(component, input_state_map, output_state=None, runtime_config_map=None, fixed_args=None, emittable=True, name=None)

Create a ComponentStep with a concise syntax.

This function creates a ComponentStep, which wraps a component and manages its inputs and outputs within the pipeline.

Usage example 1:

retriever = Retriever()
retriever_step = step(retriever, {"query": "input_query"}, "retrieved_data")

This will cause the step to execute the Retriever component with the following behavior: 1. It will pass the input_query from the pipeline state to the query argument of the Retriever. 2. It will store the retrieved_data from the Retriever result in the pipeline state.

Usage example 2:

retriever = Retriever()
custom_emitter = CustomEventEmitter()
retriever_step = step(
    retriever, {"query": "input_query"}, "retrieved_data", {"top_k": "top_k"}, {"event_emitter": custom_emitter}
)

This will cause the step to execute the Retriever component with the following behavior: 1. It will pass the input_query from the pipeline state to the query argument of the Retriever. 2. It will pass the top_k from the runtime configuration to the top_k argument of the Retriever. 3. It will pass the custom_emitter to the event_emitter argument of the Retriever without modification. 4. It will store the retrieved_data from the Retriever result in the pipeline state.

Parameters:

Name Type Description Default
component Component

The component to be executed in this step.

required
input_state_map dict[str, str]

Mapping of component input arguments to pipeline state keys.

required
output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of component arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used.

None
emittable bool

Whether an event emitter should be passed to the component, if available in the state and not explicitly provided in any of the arguments. Defaults to True.

True
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be the component's class name followed by a unique identifier.

None

Returns:

Name Type Description
ComponentStep ComponentStep

An instance of ComponentStep configured with the provided parameters.

subgraph(subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None, name=None)

Create a SubgraphStep that executes another pipeline as a subgraph.

This function creates a SubgraphStep that allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

The SubgraphStep gracefully handles missing state keys - if a key specified in input_state_map is not present in the parent state, it will be omitted from the subgraph input rather than causing an error. This allows for flexible composition of pipelines with different state schemas.

Usage example:

from typing import TypedDict
from gllm_pipeline.pipeline.pipeline import Pipeline

# Define state schemas using TypedDict
class SubgraphState(TypedDict):
    query: str
    retrieved_data: list
    reranked_data: list

class ParentState(TypedDict):
    user_input: str
    query: str
    reranked: list
    response: str

# Define a subgraph pipeline with its own state schema
subgraph_pipeline = Pipeline(
    [
        step(Retriever(), {"query": "query"}, "retrieved_data"),
        step(Reranker(), {"data": "retrieved_data"}, "reranked_data")
    ],
    state_type=SubgraphState
)

# Use the subgraph in a parent pipeline
parent_pipeline = Pipeline(
    [
        step(QueryProcessor(), {"input": "user_input"}, "query"),
        subgraph(
            subgraph_pipeline,
            {"query": "query"},  # Map parent state to subgraph input
            {"reranked": "reranked_data"},  # Map subgraph output to parent state
            runtime_config_map={"top_k": "retrieval_top_k"}
        ),
        step(ResponseGenerator(), {"data": "reranked"}, "response")
    ],
    state_type=ParentState
)

# When the parent pipeline runs:
# 1. QueryProcessor processes user_input and produces query
# 2. SubgraphStep creates a new state for the subgraph with query from parent
# 3. Subgraph executes its steps (Retriever → Reranker)
# 4. SubgraphStep maps reranked_data from subgraph to reranked in parent
# 5. ResponseGenerator uses reranked to produce response

Parameters:

Name Type Description Default
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_state_map dict[str, str] | None

Mapping of subgraph input keys to parent pipeline state keys. Keys that don't exist in the parent state will be gracefully ignored. If None, all subgraph inputs will be passed as-is.

None
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. If None, all subgraph outputs will be passed as-is.

None
runtime_config_map dict[str, str] | None

Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

None

Returns:

Name Type Description
SubgraphStep SubgraphStep

An instance of SubgraphStep configured with the provided parameters.

switch(condition, branches, input_state_map=None, output_state=None, default=None, runtime_config_map=None, fixed_args=None, name=None)

Create a ConditionalStep with multiple branches.

This function creates a ConditionalStep that can execute one of multiple branches based on a condition.

Usage example with a Callable condition:

# Using a Callable condition - receives merged state and config directly
def extract_command(data):
    # Access both state and config in a single dictionary
    query = data["query"]
    separator = data["separator"]  # From runtime config or state
    return query.split(separator)[0]

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
}
default = step(NoOpComponent(), {}, "no_op_result")

switch_step = switch(
    extract_command,
    branches,
    # input_state_map and runtime_config_map are ignored for Callable conditions
    # but can still be specified (they will have no effect)
    output_state="command_type",
    default=default,
    fixed_args={"separator": " "}  # This will be merged with state and config
)

This will cause the step to execute the SearchComponent if the first part of the query in the pipeline state is "search", the FilterComponent if it is "filter", and the NoOpComponent otherwise. The separator is provided as a fixed argument. The result of the condition will be stored in the pipeline state under the key command_type.

Usage example with a Component condition:

# Using a Component condition - requires input_state_map and runtime_config_map
command_extractor = CommandExtractor()  # A Component that extracts command from query

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
    "sort": step(SortComponent(), {"query": "query"}, "sort_result"),
}
default = step(DefaultComponent(), {"query": "query"}, "default_result")

switch_step = switch(
    command_extractor,
    branches,
    input_state_map={"text": "query"},  # Maps pipeline state to component input
    output_state="command_type",
    default=default,
    runtime_config_map={"delimiter": "separator_config"},  # Maps runtime config to component input
    fixed_args={"lowercase": True}  # Fixed arguments passed directly to component
)

This will cause the step to execute the CommandExtractor component with the query from the pipeline state as its text parameter and the separator_config from runtime configuration as its delimiter parameter. Based on the component's result (which should be one of "search", "filter", "sort", or something else), it will execute the corresponding branch component or the default component.

Parameters:

Name Type Description Default
condition ConditionType

The condition to evaluate for branch selection.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
default BasePipelineStep | list[BasePipelineStep] | None

Default branch to execute if no condition matches. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. This is only used if the condition is a Component. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly, and this parameter is ignored. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Switch" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

terminate(name=None)

Create a TerminatorStep to end pipeline execution.

This function creates a TerminatorStep that explicitly terminates a branch or the entire pipeline.

Usage example:

early_exit = terminate("early_exit")

pipeline = (
    step_a
    | if_else(should_stop, early_exit, step_b)
    | step_c
)

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Terminator" followed by a unique identifier.

None

Returns:

Name Type Description
TerminatorStep TerminatorStep

An instance of TerminatorStep.

toggle(condition, if_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)

Create a ConditionalStep that toggles between a branch and a no-op.

This function creates a ConditionalStep that executes a branch if the condition is true, and does nothing (no-op) if the condition is false.

The condition can be: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false") 4. A string key that will be looked up in the merged state data (state + runtime config + fixed args). The value will be evaluated for truthiness - any non-empty, non-zero, non-False value will be considered True.

Usage example with a Callable condition:

# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["feature_enabled"] and data["user_tier"] >= 2
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    condition,
    feature_step,
    output_state="feature_status",
    fixed_args={"user_tier": 2}  # This will be merged with state and config
)

This will execute the FeatureComponent only if both feature_enabled is true and user_tier is at least 2. Otherwise, it will do nothing. The condition result will be stored in the pipeline state under the key feature_status.

Usage example with a Component condition:

# Using a Component condition - requires input_state_map and runtime_config_map
feature_checker = FeatureChecker()  # A Component that returns "true" or "false"
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    feature_checker,
    feature_step,
    input_state_map={"user_id": "current_user"},  # Maps pipeline state to component input
    output_state="feature_status",
    runtime_config_map={"feature_name": "target_feature"},  # Maps runtime config to component input
    fixed_args={"check_permissions": True}  # Fixed arguments passed directly to component
)

This will cause the step to execute the FeatureChecker component with the current_user from the pipeline state as its user_id parameter and the target_feature from runtime configuration as its feature_name parameter. Based on the component's result ("true" or "false"), it will either execute the FeatureComponent or do nothing.

Usage example with string key:

# Using a string key - looks up the value in the merged state and config
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle("feature_enabled", feature_step, runtime_config_map={"feature_enabled": "enable_feature"})

This will execute the FeatureComponent only if the value at the key feature_enabled in the pipeline state (or enable_feature in the runtime config) evaluates to True. Otherwise, it will do nothing.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool] | str

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
input_state_map dict[str, str] | None

Mapping of condition input arguments to pipeline state keys. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
runtime_config_map dict[str, str] | None

Mapping of condition input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Toggle" followed by a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

transform(operation, input_states, output_state, runtime_config_map=None, fixed_args=None, name=None)

Create a StateOperatorStep for transforming state data.

This function creates a StateOperatorStep that applies a transformation operation to the pipeline state. Note that the function operation should accept a dictionary of input data and return the operation result.

Usage example:

def sort(data: dict) -> dict:
    is_reverse = data["reverse"]
    data["chunk"] = sorted(data["chunk"], reverse=is_reverse)


transform_step = transform(sort, ["chunk"], "output", runtime_config={"reverse": True})

This will cause the step to execute the sort operation on the chunk in the pipeline state. The result will be stored in the pipeline state under the key output. The behavior is controlled by the runtime configuration key reverse.

Parameters:

Name Type Description Default
operation Callable[[dict[str, Any]], Any]

The operation to execute on the input data.

required
input_states list[str]

List of input state keys required by the operation.

required
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

required
runtime_config_map dict[str, str] | None

Mapping of operation input arguments to runtime configuration keys. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the operation. Defaults to None.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Transform" followed by the operation's function name and a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured with the provided parameters.