Skip to content

Steps

Contains the base classes and implementations of pipeline steps.

ComponentStep(name, component, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that executes a specific component.

This step wraps a component, manages its inputs and outputs, and integrates it into the pipeline.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

component Component

The component to be executed in this step.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initializes a new ComponentStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
component Component

The component to be executed in this step.

required
output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Executes the component and processes its output.

This method validates inputs, prepares data, executes the component, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The update to the pipeline state after this step's operation, or None if output_state is None. When not None, this includes new or modified data produced by the component, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during component execution.

TimeoutError

If the component execution times out.

CancelledError

If the component execution is cancelled.

ConditionalStep(name, branches, condition=None, output_state=None, condition_aggregator=lambda x: ';'.join((str(i)) for i in x), input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BranchingStep, HasInputsMixin

A conditional pipeline step that conditionally executes different branches based on specified conditions.

This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.

A minimal usage requires defining the branches to execute based on a condition, which is a callable that takes input from the state and returns a string identifying the branch to execute.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_map is used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. In this case, input_map is not used to build the payload and should not be passed.

Examples:

ConditionalStep(
    name="UseCaseSelection",
    branches={"A": step_a, DEFAULT_BRANCH: step_b},
    condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)

This will execute step_a if the query contains "", and step_b otherwise.

The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches, the step will raise an error.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps.

condition list[ConditionType] | None

The condition(s) to evaluate for branch selection.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | None

Key to store the condition result in the state, if desired.

condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initializes a new ConditionalStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

A dictionary mapping condition results to steps or lists of steps to execute.

required
condition ConditionType | list[ConditionType] | None

The condition(s) to evaluate for branch selection. If a Callable, it receives the merged state and config as keyword arguments. If None, the condition is evaluated from the state. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None.

None
condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";").

lambda x: join((str(i)) for i in x)
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val|Group) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. 4. dict[str, Group] to wrap lookup/literal specs while keeping conditional behavior unchanged. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Executes the conditional step, determines the route, and returns a Command.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Name Type Description
Command Command

A LangGraph Command object with 'goto' for routing and 'update' for state changes.

execute_direct(state, runtime, config=None) async

Execute this step directly, handling both branch selection and execution.

This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

select_path(state, runtime) async

Determines the logical route key based on the evaluated condition(s).

This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
runtime Runtime

Runtime information for this step's execution.

required

Returns:

Name Type Description
str str

The identifier of the selected logical route.

Raises:

Type Description
Exception

May raise exceptions from condition evaluation.

EmptyStepErrorHandler(output_state)

Bases: BaseStepErrorHandler

Strategy that replace the current state of the output states to None on error.

Attributes:

Name Type Description
output_state list[str]

Output key(s) to map input values to.

Initialize the strategy with optional output state mapping.

Parameters:

Name Type Description Default
output_state str | list[str]

Output key(s) to map input values to. Can be a single string, list of strings.

required

FallbackStepErrorHandler(fallback)

Bases: BaseStepErrorHandler

Strategy that executes a fallback callable on error.

Attributes:

Name Type Description
fallback Callable[[BaseException, PipelineState, Runtime, ErrorContext], dict[str, Any] | None]

A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state.

Initialize the strategy with a fallback callable.

Parameters:

Name Type Description Default
fallback Callable[[BaseException, PipelineState, Runtime, ErrorContext], dict[str, Any] | None]

A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state.

required

GoToStep(name, target, input_map=None, output_state=None, allow_end=True, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that enables dynamic flow control by jumping to specified nodes.

This step allows for dynamic routing in pipelines by jumping to different nodes based on runtime conditions. The target can be a static node name, a state key, or a callable that determines the target at runtime.

Attributes:

Name Type Description
name str

A unique identifier for the pipeline step.

target str | Val | Callable[[dict[str, Any]], str]

The target node specification.

input_map dict[str, str | Val]

Mapping of argument names to either a state/config key (str) or a fixed value (Val). Inherited from HasInputsMixin.

output_state str | None

The state key to store the resolved target.

allow_end bool

Whether jumping to 'END' is allowed.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy. Inherited from BasePipelineStep.

cache_store Any | None

The cache store used for caching step results, if configured. Inherited from BasePipelineStep.

is_cache_enabled bool

Property indicating whether caching is enabled for this step. Inherited from BasePipelineStep.

Initializes a new GoToStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
target str | Val | Callable[[dict[str, Any]], str]

The target node to jump to. Can be a literal string node name, a Val containing the node name, or a callable that takes the resolved inputs and returns the target node name.

required
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
output_state str | None

Key to store the target node name in the pipeline state. Defaults to None.

None
allow_end bool

Whether to allow jumping to the END node. Defaults to True.

True
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

add_to_graph(graph, previous_endpoints, retry_policy=None)

Adds the GoToStep to the graph and creates edges.

Parameters:

Name Type Description Default
graph Any

The LangGraph graph to add the step to.

required
previous_endpoints list[str]

List of source node names to connect from.

required
retry_policy RetryPolicy | None

Optional retry policy to override the step's default. Defaults to None.

None

Returns:

Type Description
list[str]

list[str]: List of endpoint names after adding this step.

apply_exclusions(exclusions)

Applies exclusions to this goto step.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime, config=None) async

Executes the goto step and returns a LangGraph Command.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime

The runtime context containing config.

required
config RunnableConfig | None

Optional runnable configuration. Defaults to None.

None

Returns:

Name Type Description
Command Command

A LangGraph Command with goto parameter for routing.

Raises:

Type Description
ValueError

If the resolved target is invalid.

KeyError

If state key is missing for string targets.

execute_direct(state, runtime, config=None) async

Executes the goto step and returns a state update.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime

The runtime context containing config.

required
config RunnableConfig | None

Optional runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: State update with resolved target if output_state is specified, None otherwise.

Raises:

Type Description
ValueError

If the resolved target is invalid.

KeyError

If state key is missing for string targets.

get_mermaid_diagram(indent=0)

Generates a Mermaid diagram representation of this goto step.

Parameters:

Name Type Description Default
indent int

Number of spaces to indent the diagram. Defaults to 0.

0

Returns:

Name Type Description
str str

The Mermaid diagram string.

GuardStep(name, condition, success_branch, failure_branch=None, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: ConditionalStep

A conditional step that can terminate pipeline execution if a condition is not met.

This step evaluates a condition and either: 1. Continues execution through the success_branch if the condition is True 2. Executes the failure_branch and terminates if the condition is False

Examples:

pipeline = (
    step_a
    | GuardStep(
        name="auth_check",
        condition=lambda x: x["is_authenticated"],
        success_branch=step_b,
        failure_branch=error_handling_step,
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

condition ConditionType

The condition to evaluate.

input_map dict[str, str | Any] | None

Unified input map.

success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately on False condition.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new GuardStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
condition ConditionType

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

InterruptStep(name, message='Pipeline interrupted.', resume_value_map=None, **kwargs)

Bases: BasePipelineStep

A pipeline step that pauses execution using LangGraph's interrupt feature.

When the pipeline reaches this step, it will throw an interrupt which pauses execution and returns control to the caller (along with the current state). To resume, the user must invoke the pipeline again with a Command(resume=...) payload and the same thread_id.

The value returned from the interrupt() call (which is whatever the user provides in Command(resume=val)) will be stored in the state key specified by resume_value_map if provided.

Initializes an InterruptStep.

Parameters:

Name Type Description Default
name str

The unique identifier for this step.

required
message str | dict[str, Any]

The payload to emit when the interrupt occurs. Defaults to "Pipeline interrupted.".

'Pipeline interrupted.'
resume_value_map str | list[str] | None

State key(s) where the resume value will be stored. Defaults to None.

None
**kwargs Any

Additional configuration (e.g., error_handler) for the BasePipelineStep.

{}

Raises:

Type Description
ValueError

If cache is provided in kwargs, as this step cannot be cached.

execute(state, runtime, config=None) async

Execute the interrupt and handle the resumed value.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime

The pipeline runtime.

required
config RunnableConfig | None

The RunnableConfig.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: State updates containing the resumed value if a resume_value_map is configured.

Raises:

Type Description
TypeError

If resume_value_map is a list but the resumed value is not a dictionary.

KeepStepErrorHandler()

Bases: BaseStepErrorHandler

Strategy that preserves the current state on error.

Initialize the keep error handler.

LogStep(name, message, is_template=True, emit_kwargs=None, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep

A specialized pipeline step for logging messages.

This step uses the Messenger component to log messages during pipeline execution. It supports both plain text messages and template messages with placeholders for state variables.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

messenger Messenger

The messenger component used to format and send messages.

emit_kwargs dict[str, Any]

Additional arguments to pass to the event emitter.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new LogStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
message str

The message to be logged. May contain placeholders in curly braces for state variables.

required
is_template bool

Whether the message is a template with placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional keyword arguments to pass to the event emitter. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Executes the log step by formatting and emitting the message.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

Raises:

Type Description
RuntimeError

If an error occurs during message emission.

MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep, HasInputsMixin

A step that applies a mapping function to multiple inputs and reduces the results.

This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_map). 2. A reduce function that combines all the mapped results.

Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.

Internally, this step uses asyncio.gather() for efficient parallel execution.

Attributes:

Name Type Description
name str

A unique identifier for this step.

map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. Will be run in parallel if the function is an asynchronous function.

reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results.

input_map InputMapSpec | None

Unified input map.

output_state str

Key to store the reduced result in the pipeline state.

retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig.

Initialize a new MapReduceStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_map.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val|Group) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. 4. dict[str, Group] to keep wrapped lookup/literal values intact across fan-out. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Execute the map and reduce operations.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The reduced result stored under output_state.

Raises:

Type Description
RuntimeError

If an error occurs during execution.

NoOpStep(name, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep

A step that does nothing.

This step is useful when you want to add a step that does not perform any processing. For example, you can use this step to implement a toggle pattern for a certain component.

Examples:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "execute": step_b,
            "continue": NoOpStep("no_op")
        },
        condition=lambda x: "execute" if x["should_execute"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

execute(state, runtime, config=None) async

Executes this step, which does nothing.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Name Type Description
None None

This step does not modify the pipeline state.

ParallelStep(name, branches, input_states=None, squash=True, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BranchingStep, HasInputsMixin

A pipeline step that executes multiple branches in parallel.

This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, PipelineSteps]

The branches to execute in parallel.

input_map dict[str, str | Val] | None

Unified input map.

squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initialize a new ParallelStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches list | dict[str, PipelineSteps]

The branches to execute in parallel. Can be either: List format: Each branch can be: 1. A single step 2. A list of steps to execute sequentially Example: [step1, [step2, step3], step4] Dict format: Keys are branch names, values can be: 1. A single step 2. A list of steps to execute sequentially Example: {"analysis": step1, "validation": [step2, step3], "cleanup": step4} Enables more intuitive step exclusion using branch names.

required
input_states list[str] | None

Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None.

None
squash bool

Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True.

True
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

add_to_graph(graph, previous_endpoints, retry_policy=None)

Handle both squashed and expanded modes.

For squashed mode: add the parallel step as a single node. For expanded mode: add the parallel step as a single node and add children to graph.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

Endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Retry policy to propagate to child steps. Defaults to None, in which case the retry policy of the step is used.

None

Returns:

Type Description
list[str]

list[str]: Exit points after adding all child steps.

execute(state, runtime, config=None) async

Execute all branches in parallel and merge their results.

This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced.

Raises:

Type Description
CancelledError

If execution is cancelled, preserved with added context.

BaseInvokerError

If an error occurs during LM invocation.

RuntimeError

For all other exceptions during execution, wrapped with context information.

TimeoutError

If execution times out, preserved with added context.

ValidationError

If input validation fails.

PauseStep(name, **kwargs)

Bases: BasePipelineStep

A named marker step for use as a static breakpoint target.

PauseStep does not pause execution by itself. It exists only so that its name can be referenced in interrupt_before or interrupt_after lists when calling Pipeline.invoke().

This is the recommended pattern for debugging: insert one or more PauseStep instances at interesting points in the pipeline, then selectively activate breakpoints at invocation time without recompiling the graph.

Examples:

from gllm_pipeline.steps import pause, step
from gllm_pipeline.pipeline import Pipeline

bp = pause(name="before_llm")
pipeline = Pipeline([preprocess, bp, llm_step, postprocess])

# Debug run — pause right before the LLM call:
result = await pipeline.invoke(state, interrupt_before=["before_llm"])

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

Initialize a PauseStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this step. This name is used as the target for interrupt_before or interrupt_after.

required
**kwargs Any

Forwarded to BasePipelineStep.__init__.

{}

Raises:

Type Description
ValueError

If cache is provided in kwargs, as this step cannot be cached.

execute(state, runtime, config=None) async

Execute this step, which does nothing.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

RaiseStepErrorHandler()

Bases: BaseStepErrorHandler

Strategy that raises exceptions with enhanced context.

Initialize the raise error handler.

StateOperatorStep(name, input_states=None, output_state=None, operation=None, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep, HasInputsMixin

A pipeline step that performs an operation on the pipeline state and updates it.

This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

operation Callable[[dict[str, Any]], Any]

The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new StateOperatorStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
input_states list[str]

Keys of the state data required by the operation.

None
output_state str | list[str]

Key(s) to store the operation result in the pipeline state.

None
operation Callable[[dict[str, Any]], Any]

The operation to execute. It should accept a dictionary of input data and return the operation result.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

execute(state, runtime, config=None) async

Executes the operation and processes its output.

This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state.

Raises:

Type Description
RuntimeError

If an error occurs during operation execution.

SubgraphStep(name, subgraph, output_state_map=None, input_map=None, retry_config=None, error_handler=None, cache=None)

Bases: BaseCompositeStep, HasInputsMixin

A pipeline step that executes another pipeline as a subgraph.

This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

subgraph Pipeline

The pipeline to be executed as a subgraph.

input_map dict[str, str | Val] | None

Unified input map.

output_state_map dict[str, str]

Mapping of parent pipeline state keys to subgraph output keys.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

Initializes a new SubgraphStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using the SDK's RetryConfig. If None, no retry policy is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to be used for this step. If None, no error handler is applied.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

apply_exclusions(exclusions)

Apply exclusions to this subgraph and its children.

Subgraph has no internal structural changes. It marks itself and uniformly propagates child exclusions to all subgraph steps.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime, config=None) async

Executes the subgraph and processes its output.

This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime

Runtime information for this step's execution.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None.

Raises:

Type Description
RuntimeError

If an error occurs during subgraph execution, with details about which step caused the error.

TerminatorStep(name, retry_config=None, error_handler=None, cache=None)

Bases: BasePipelineStep

A step that connects previous steps to the END node.

This step is useful when you want to explicitly terminate a branch or the entire pipeline. It has no processing logic and simply acts as a connection point to the END node.

Examples:

pipeline = (
    step_a
    | ConditionalStep(
        name="branch",
        branches={
            "terminate": TerminatorStep("early_end"),
            "continue": step_b
        },
        condition=lambda x: "terminate" if x["should_stop"] else "continue"
    )
    | step_c
)

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

add_to_graph(graph, previous_endpoints, retry_policy=None)

Adds this step to the graph and connects it to the END node.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add this step to.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy. If None, the retry policy of the step is used. If the step is not a retryable step, this parameter is ignored.

None

Returns:

Type Description
list[str]

list[str]: Empty list as this step has no endpoints (it terminates the flow).

execute(state, runtime, config=None) async

Executes this step, which does nothing but pass through the state.

Parameters:

Name Type Description Default
state PipelineState

The current pipeline state.

required
runtime Runtime

The runtime information.

required
config RunnableConfig | None

The runnable configuration. Defaults to None.

None

WhileDoStep(name, body, condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache=None)

Bases: BaseCompositeStep, HasInputsMixin

A pipeline step that executes a do-while loop construct.

The loop body executes at least once and a condition node appended at the end determines whether to jump back to the loop's entry point.

Attributes:

Name Type Description
name str

A unique identifier for the pipeline step.

body BasePipelineStep | list[BasePipelineStep]

The step(s) to execute in the loop body.

condition LoopConditionType

The condition to evaluate after each iteration.

input_map InputMapSpec | None

Unified input map for the condition.

output_state str | None

Key to store the condition result in the pipeline state.

retry_policy RetryPolicy | None

Configuration for retry behavior.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initialize a WhileDoStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
body BasePipelineStep | list[BasePipelineStep]

The loop body to execute.

required
condition LoopConditionType

The condition to evaluate at the end of each iteration.

required
input_map InputMapSpec | None

Unified input map for the condition. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. Defaults to None.

None
retry_config RetryConfig | None

Retry config applied to the condition node. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handling strategy for condition. Defaults to None.

None
cache CacheConfig | None

Caching config for the condition. Defaults to None.

None

Raises:

Type Description
TypeError

If body is empty or condition is invalid.

apply_exclusions(exclusions)

Apply exclusions to this composite step and its children.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime, config=None) async

Evaluate the condition and route back or exit the loop.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline workflow.

required
runtime Runtime

The runtime context of the pipeline workflow execution.

required
config RunnableConfig | None

Execution configuration for the node. Defaults to None.

None

Returns:

Type Description
Command | dict[str, Any] | None

Command | dict[str, Any] | None: A LangGraph command directing execution back to the loop entry, an update payload when the loop exits, or None when there is no state update.

execute_direct(state, runtime, config=None) async

Reject direct execution for WhileDoStep.

Parameters:

Name Type Description Default
state dict[str, Any]

The initial state data.

required
runtime Runtime

The runtime context of the pipeline execution.

required
config RunnableConfig | None

Configuration for the runnable. Defaults to None.

None

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: This method does not return normally.

Raises:

Type Description
NotImplementedError

Always raised since loop routing requires a graph.

bundle(input_states, output_state, retry_config=None, error_handler=None, cache=None, name=None)

Create a StateOperatorStep to combine multiple state keys.

This function creates a StateOperatorStep that combines multiple keys from the pipeline state into a single output without modifying the data.

Examples:

bundle_step = bundle(["input1", "input2"], "output")
# Produces: {"output": {"input1": state["input1"], "input2": state["input2"]}}

This will cause the step to bundle the values of input1 and input2 from the pipeline state into a single dictionary. The result will be stored in the pipeline state under the key output.

With remapping:

# Provide a mapping of desired output field names to source state keys
# Renames state key "user_id" to "id" in the bundled output
bundle_step = bundle({"id": "user_id"}, "output")
# Produces: {"output": {"id": state["user_id"]}}

Parameters:

Name Type Description Default
input_states list[str] | dict[str, str]
  1. If a list is provided, the listed state keys are bundled as-is (identity mapping).
  2. If a dict is provided, it is treated as a mapping of output field names to source state keys. The bundled result will use the dict keys as field names.
required
output_state str | list[str]

Key(s) to store the bundled data in the pipeline state.

required
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Bundle" followed by a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured to bundle the input states.

copy(input_state, output_state, retry_config=None, error_handler=None, name=None, cache=None)

Create a step that copies values from input state keys to output state keys.

This function creates a StateOperatorStep that copies data from input state(s) to output state(s) without any transformation. The function handles different scenarios: 1. Single input to single output: Direct copy 2. Single input to multiple outputs: Broadcast the input to all outputs 3. Multiple inputs to single output: Pack all inputs into a list 4. Multiple inputs to multiple outputs: Copy each input to corresponding output (must have same length)

Parameters:

Name Type Description Default
input_state str | list[str]

Input state key(s) to copy from.

required
output_state str | list[str]

Output state key(s) to copy to.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this step. Defaults to None, in which case the name will be "Copy" followed by a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured to copy the input states to output states.

Raises:

Type Description
ValueError

If both input_state and output_state are lists but have different lengths.

Examples:

Single input to single output:

step = copy("input_data", "output_data")
# Copies value from "input_data" key to "output_data" key

Single input to multiple outputs (broadcast):

step = copy("input_data", ["output1", "output2", "output3"])
# Copies value from "input_data" to all three output keys

Multiple inputs to single output (pack):

step = copy(["input1", "input2", "input3"], "packed_output")
# Packs values from all three input keys into a list at "packed_output"

Multiple inputs to multiple outputs (pairwise):

step = copy(["input1", "input2"], ["output1", "output2"])
# Copies "input1" → "output1" and "input2" → "output2"

With custom name and retry config:

from gllm_core import RetryConfig
retry_cfg = RetryConfig(max_attempts=3, delay=1.0)
step = copy(
    "input_data",
    "output_data",
    name="DataCopyStep",
    retry_config=retry_cfg
)

goto(name, target, input_map=None, output_state=None, allow_end=True, retry_config=None, error_handler=None, cache=None)

Create a GoToStep for dynamic flow control.

This convenience function creates a GoToStep that enables dynamic flow control by jumping to specified nodes in the pipeline.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
target Val | str | Callable

The jump target specification. 1. Val: Static target (unwrapped value) 2. str: State key to read target from 3. Callable: Function that computes the target

required
input_map InputMapSpec | None

Input mapping for callable targets. Only used when target is a callable. Defaults to None.

None
output_state str | None

Optional key to store the resolved target in state. If provided, the resolved target will be saved to this state key. Defaults to None.

None
allow_end bool

Whether jumping to "END" is allowed. If False and target resolves to "END", a ValueError is raised. Defaults to True.

True
retry_config RetryConfig | None

Configuration for retry behavior. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy for error handling. Defaults to None.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

Returns:

Name Type Description
GoToStep GoToStep

A configured GoToStep instance.

guard(condition, success_branch, failure_branch=None, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a GuardStep with a concise syntax.

This function creates a GuardStep that can terminate pipeline execution if a condition is not met.

Examples:

auth_check = lambda state: state["is_authenticated"]
success_step = step(SuccessHandler(), {"input": "input"}, "output")
error_step = step(ErrorHandler(), {"error": "auth_error"}, "error_message")


guard_step = guard(
    auth_check,
    success_branch=success_step,
    failure_branch=error_step,
    input_map={"user_id": "current_user", "model": "auth_model", "strict_mode": Val(True)},
    output_state="auth_result",
)

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
success_branch BasePipelineStep | list[BasePipelineStep]

Steps to execute if condition is True.

required
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None.

None
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
input_map InputMapSpec

Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Guard" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
GuardStep GuardStep

An instance of GuardStep configured with the provided parameters.

if_else(condition, if_branch, else_branch, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a simple ConditionalStep with two branches.

This function creates a ConditionalStep that executes one of two branches based on a condition.

The condition can be either: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false")

For boolean conditions and string conditions, True/true/TRUE maps to the if_branch and False/false/FALSE maps to the else_branch.

Examples:

With a Callable condition:

# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["input"] > data["threshold"]

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    condition,
    if_branch,
    else_branch,
    output_state="condition_result",
    input_map={"threshold": Val(0)}
)

This will cause the step to execute the PositiveComponent if the input in the pipeline state is greater than the threshold (0), and the NegativeComponent otherwise. The result of the condition will be stored in the pipeline state under the key condition_result.

With a Component condition:

# Using a Component condition - requires input_map for specifying inputs
threshold_checker = ThresholdChecker()  # A Component that returns "true" or "false"

if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")

if_else_step = if_else(
    threshold_checker,
    if_branch,
    else_branch,
    output_state="condition_result",
    input_map={
        "value": "input",
        "threshold": "threshold_config",
        "strict_mode": Val(True),
    }
)

This will cause the step to execute the ThresholdChecker component with the input from the pipeline state as its value parameter and the threshold_config from runtime configuration as its threshold parameter. Based on the component's result ("true" or "false"), it will execute either the PositiveComponent or the NegativeComponent.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
else_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is false.

required
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
input_map InputMapSpec

Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "IfElse" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

interrupt(message='Pipeline interrupted.', resume_value_map=None, name=None)

Create a specialized step that pauses pipeline execution.

This function creates an InterruptStep that leverages LangGraph's native interrupt behavior to halt execution and return control to the caller.

Parameters:

Name Type Description Default
message str | dict[str, Any]

The payload to emit when paused. Defaults to "Pipeline interrupted.".

'Pipeline interrupted.'
resume_value_map str | list[str] | None

State key(s) where the resumed value will be stored. Defaults to None.

None
name str | None

A unique identifier for this pipeline step. Defaults to None.

None

Returns:

Name Type Description
InterruptStep InterruptStep

An instance of InterruptStep.

log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache=None)

Create a specialized step for logging messages.

This function creates a LogStep that logs messages within a pipeline. It can be used to log status updates, debugging information, or any other text during pipeline execution.

The message can be a plain string or a template with placeholders for state variables.

Examples:

Plain message:

log_step = log("Processing document", is_template=False)

Template message with state variables:

log_step = log("Processing query: {query} with model: {model_name}")

Parameters:

Name Type Description Default
message str

The message to be logged. May contain placeholders in curly braces for state variables.

required
is_template bool

Whether the message is a template with placeholders. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Additional keyword arguments to pass to the event emitter. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. If None, a name will be auto-generated with the prefix "log_". Defaults to None.

None

Returns:

Name Type Description
LogStep LogStep

A specialized pipeline step for logging messages.

map_reduce(output_state, map_func, reduce_func=lambda results: results, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a MapReduceStep that maps a function over multiple inputs and reduces the results.

This function creates a step that applies a mapping function to multiple inputs in parallel and combines the results using a reduction function.

The map_func can be either: 1. A callable function that takes a dictionary as input and returns a result. 2. A Component instance, which will be executed with proper async handling.

Important note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.

The step supports automatic broadcasting of scalar values and handles lists appropriately: 1. If multiple list inputs are provided, they must be the same length. 2. Scalar inputs are broadcasted to match list lengths.

Examples:

Processing a list of items with an async map function:

async def count_words(item):
    await asyncio.sleep(0.1)  # Simulate I/O operation
    return len(item["document"].split())

process_docs = map_reduce(
    output_state="word_counts", # A list of word counts for each document
    map_func=count_words,
    input_map=[{"document": "documents"}],  # A list, e.g. ["doc1...", "doc2...", "doc3..."]
    reduce_func=lambda results: sum(results), # Sum word counts
)

# When executed with {"documents": ["doc1...", "doc2...", "doc3..."]},
# returns {"word_counts": 60} (total word count)

Broadcasting scalar values to match list length:

# Apply a common threshold to multiple values
threshold_check = map_reduce(
    output_state="above_threshold",
    map_func=lambda item: item["value"] > item["threshold"],
    input_map=[{"value": "values", "threshold": "threshold"}],  # "values" is a list, "threshold" is scalar
    reduce_func=lambda results: results  # Return list of boolean results
)
# When executed with {"values": [5, 10, 15], "threshold": 8},
# returns {"above_threshold": [False, True, True]}

Multiple list inputs with the same length:

similarity_step = map_reduce(
    output_state="similarity_scores",
    map_func=lambda item: calculate_similarity(item["doc1"], item["doc2"]),
    input_map=[{"doc1": "documents_a", "doc2": "documents_b"}],  # Both lists of same length
    reduce_func=lambda results: sum(results) / len(results)  # Average similarity
)
# When executed with {"documents_a": ["doc1", "doc2", "doc3"], "documents_b": ["docA", "docB", "docC"]},
# returns {"similarity_scores": 0.75}

Using a Component for complex processing instead of a map function:

summarizer = TextSummarizer() # Subclass of Component
summarize_step = map_reduce(
    output_state="summaries",
    map_func=summarizer,
    input_map=[{"text": "documents", "max_length": "max_length"}],  # List of documents + scalar param
    reduce_func=lambda results: [r["summary"] for r in results]
)
# When executed with {"documents": ["doc1...", "doc2..."], "max_length": 50},
# returns {"summaries": ["summary1...", "summary2..."]}

Parameters:

Name Type Description Default
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_map.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce the mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
input_map InputMapSpec

Direct unified input map. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this step. Defaults to None, in which case the name will be "MapReduce" followed by the map function name.

None

Returns:

Name Type Description
MapReduceStep MapReduceStep

An instance of MapReduceStep configured with the provided parameters.

no_op(name=None)

Create a NoOpStep to add a step that does nothing.

This function creates a PassThroughStep that does nothing.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "NoOp" followed by a unique identifier.

None

Returns:

Name Type Description
NoOpStep NoOpStep

An instance of NoOpStep.

parallel(branches, input_states=None, squash=True, input_map=None, error_handler=None, cache=None, retry_config=None, name=None)

Create a ParallelStep that executes multiple branches concurrently.

This function creates a ParallelStep that runs multiple branches in parallel and merges their results. Each branch can be a single step or a list of steps to execute sequentially.

The step supports two execution modes controlled by the squash parameter: 1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for: a. Better raw performance b. Simpler implementation c. Less overhead d. Less transparent for debugging and tracing 2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for: a. More native LangGraph integration b. More transparent for debugging and tracing

For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.

Examples:

Define branches as a list of steps or lists of steps:

parallel_step = parallel(
    branches=[
        step(ComponentA(), {"input": "query"}, "output_a"),
        [
            step(ComponentB1(), {"input": "query"}, "output_b1"),
            step(ComponentB2(), {"input": "output_b1"}, "output_b2")
        ],
        step(ComponentC(), {"input": "query"}, "output_c")
    ],
    input_states=["query"],  # Only 'query' will be passed to branches
)

Define branches as a dictionary of branches: Other than the list format, we can also use the dictionary format for branches to make it easier to exclude branches.

parallel_step = parallel(
    branches={
        "branch_a": step(ComponentA(), {"input": "query"}, "output_a"),
        "branch_b": step(ComponentB(), {"input": "query"}, "output_b"),
    },
    input_states=["query"],
)

Parameters:

Name Type Description Default
input_states list[str] | None

Keys from the state to pass to branches. Defaults to None, in which case all state keys will be passed.

None
squash bool

Whether to squash execution into a single node. If True, uses asyncio.gather() to run branches in parallel. If False, uses native LangGraph structures for parallelism. Defaults to True.

True
input_map InputMapSpec

Direct unified input map. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this parallel step. Defaults to None. In this case, a name will be auto-generated.

None

Returns:

Name Type Description
ParallelStep ParallelStep

An instance of ParallelStep configured with the provided branches.

pause(name=None)

Create a named marker step for static breakpoint targeting.

A PauseStep does not pause execution by itself. Its name is meant to be referenced in the interrupt_before or interrupt_after parameters of Pipeline.invoke().

Parameters:

Name Type Description Default
name str | None

A unique identifier for this step. Defaults to None, in which case a name is auto-generated.

None

Returns:

Name Type Description
PauseStep 'PauseStep'

An instance of PauseStep.

step(component, output_state=None, input_map=None, retry_config=None, error_handler=None, name=None, cache=None)

Create a ComponentStep with a concise syntax.

This function creates a ComponentStep, which wraps a component and manages its inputs and outputs within the pipeline.

Examples:

We can leverage the input_map parameter to specify both state/config keys (as strings) and fixed values (as any type) in a single dictionary.

retriever = Retriever()
retriever_step = step(
    retriever,
    input_map={
        "query": "user_input",
        "top_k": "config_top_k",
        "conversation_id": "Val(<fixed_value>)",
    }
    output_state="retrieved_data",
)

This will cause the step to execute the Retriever component with the following behavior: 1. It will pass the user_input from the pipeline state to the query argument of the Retriever. 2. It will pass the config_top_k from the runtime configuration to the top_k argument of the Retriever. 3. It will pass the <fixed_value> from the conversation_id argument of the Retriever. 4. It will store the retrieved_data from the Retriever result in the pipeline state.

Parameters:

Name Type Description Default
component Component

The component to be executed in this step.

required
output_state str | list[str] | None

Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None.

None
input_map InputMapSpec

Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be the component's class name followed by a unique identifier.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

Returns:

Name Type Description
ComponentStep ComponentStep

An instance of ComponentStep configured with the provided parameters.

subgraph(subgraph, output_state_map=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a SubgraphStep that executes another pipeline as a subgraph.

This function creates a SubgraphStep that allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.

The SubgraphStep gracefully handles missing state keys - if a key specified in input_map is not present in the parent state, it will be omitted from the subgraph input rather than causing an error. This allows for flexible composition of pipelines with different state schemas.

Examples:

from gllm_pipeline.utils.typing_compat import TypedDict
from gllm_pipeline.pipeline.pipeline import Pipeline

# Define state schemas using TypedDict
class SubgraphState(TypedDict):
    query: str
    retrieved_data: list
    reranked_data: list

class ParentState(TypedDict):
    user_input: str
    query: str
    reranked: list
    response: str

# Define a subgraph pipeline with its own state schema
subgraph_pipeline = Pipeline(
    [
        step(Retriever(), {"query": "query"}, "retrieved_data"),
        step(Reranker(), {"data": "retrieved_data"}, "reranked_data")
    ],
    state_type=SubgraphState
)

# Use the subgraph in a parent pipeline
parent_pipeline = Pipeline(
    [
        step(QueryProcessor(), {"input": "user_input"}, "query"),
        subgraph(
            subgraph_pipeline,
            input_map={"query": "query", "model": "retrieval_model", "top_k": Val(10)},
            output_state_map={"reranked": "reranked_data"},
        ),
        step(ResponseGenerator(), {"data": "reranked"}, "response")
    ],
    state_type=ParentState
)

# When the parent pipeline runs:
# 1. QueryProcessor processes user_input and produces query
# 2. SubgraphStep creates a new state for the subgraph with query from parent
# 3. Subgraph executes its steps (Retriever → Reranker)
# 4. SubgraphStep maps reranked_data from subgraph to reranked in parent
# 5. ResponseGenerator uses reranked to produce response

Parameters:

Name Type Description Default
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
output_state_map dict[str, str] | None

Mapping of parent pipeline state keys to subgraph output keys. If None, all subgraph outputs will be passed as-is.

None
input_map InputMapSpec | None

Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

None

Returns:

Name Type Description
SubgraphStep SubgraphStep

An instance of SubgraphStep configured with the provided parameters.

switch(condition, branches, output_state=None, default=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a ConditionalStep with multiple branches.

This function creates a ConditionalStep that can execute one of multiple branches based on a condition.

Examples:

With a Callable condition:

# Using a Callable condition - receives merged state and config directly
def extract_command(data):
    # Access both state and config in a single dictionary
    query = data["query"]
    separator = data["separator"]  # From runtime config or state
    return query.split(separator)[0]

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
}
default = step(NoOpComponent(), {}, "no_op_result")

switch_step = switch(
    extract_command,
    branches,
    input_map={"separator": Val(" ")}
    output_state="command_type",
    default=default,
)

This will cause the step to execute the SearchComponent if the first part of the query in the pipeline state is "search", the FilterComponent if it is "filter", and the NoOpComponent otherwise. The separator is provided as a fixed argument. The result of the condition will be stored in the pipeline state under the key command_type.

With a Component condition:

# Using a Component condition - requires input_map for specifying inputs
command_extractor = CommandExtractor()  # A Component that extracts command from query

branches = {
    "search": step(SearchComponent(), {"query": "query"}, "search_result"),
    "filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
    "sort": step(SortComponent(), {"query": "query"}, "sort_result"),
}
default = step(DefaultComponent(), {"query": "query"}, "default_result")

switch_step = switch(
    command_extractor,
    branches,
    input_map={"text": "query", "delimiter": "separator_config", "lowercase": Val(True)},
    output_state="command_type",
    default=default,
)

This will cause the step to execute the CommandExtractor component with the query from the pipeline state as its text parameter and the separator_config from runtime configuration as its delimiter parameter. Based on the component's result (which should be one of "search", "filter", "sort", or something else), it will execute the corresponding branch component or the default component.

Parameters:

Name Type Description Default
condition ConditionType

The condition to evaluate for branch selection.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
default BasePipelineStep | list[BasePipelineStep] | None

Default branch to execute if no condition matches. Defaults to None.

None
input_map InputMapSpec

Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Switch" followed by the condition's function name and a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

terminate(name=None, retry_config=None, error_handler=None, cache=None)

Create a TerminatorStep to end pipeline execution.

This function creates a TerminatorStep that explicitly terminates a branch or the entire pipeline.

Examples:

early_exit = terminate("early_exit")

pipeline = (
    step_a
    | if_else(should_stop, early_exit, step_b)
    | step_c
)

Parameters:

Name Type Description Default
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Terminator" followed by a unique identifier.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None

Returns:

Name Type Description
TerminatorStep TerminatorStep

An instance of TerminatorStep.

toggle(condition, if_branch, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a ConditionalStep that toggles between a branch and a no-op.

This function creates a ConditionalStep that executes a branch if the condition is true, and does nothing (no-op) if the condition is false.

The condition can be: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false") 4. A string key that will be looked up in the merged state data (state + runtime config + fixed args). The value will be evaluated for truthiness - any non-empty, non-zero, non-False value will be considered True.

Examples:

With a Callable condition:

# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["feature_enabled"] and data["user_tier"] >= 2
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    condition,
    feature_step,
    output_state="feature_status",
    input_map={"user_tier": Val(2)},
)

This will execute the FeatureComponent only if both feature_enabled is true and user_tier is at least 2. Otherwise, it will do nothing. The condition result will be stored in the pipeline state under the key feature_status.

With a Component condition:

# Using a Component condition - requires input_map for specifying inputs
feature_checker = FeatureChecker()  # A Component that returns "true" or "false"
feature_step = step(FeatureComponent(), {"input": "input"}, "output")

toggle_step = toggle(
    feature_checker,
    feature_step,
    output_state="feature_status",
    input_map={"user_id": "current_user", "feature_name": "target_feature", "check_permissions": Val(True)},
)

This will cause the step to execute the FeatureChecker component with the current_user from the pipeline state as its user_id parameter and the target_feature from runtime configuration as its feature_name parameter. Based on the component's result ("true" or "false"), it will either execute the FeatureComponent or do nothing.

Parameters:

Name Type Description Default
condition ConditionType | Callable[[dict[str, Any]], bool] | str

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
output_state str | None

Key to store the condition result in the pipeline state. Defaults to None.

None
input_map InputMapSpec | None

Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Toggle" followed by a unique identifier.

None

Returns:

Name Type Description
ConditionalStep ConditionalStep

An instance of ConditionalStep configured with the provided parameters.

transform(operation, input_states=None, *, output_state, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a StateOperatorStep for transforming state data.

This function creates a StateOperatorStep that applies a transformation operation to the pipeline state. Note that the function operation should accept a dictionary of input data and return the operation result.

Examples:

def sort(data: dict) -> dict:
    is_reverse = data["reverse"]
    data["chunk"] = sorted(data["chunk"], reverse=is_reverse)

transform_step = transform(
    operation=sort,
    input_map=["chunk", {"reverse": Val(True)}],
    output_state="output",
)

This will cause the step to execute the sort operation on the chunk in the pipeline state. The result will be stored in the pipeline state under the key output. The behavior is controlled by the runtime configuration key reverse.

Parameters:

Name Type Description Default
operation Callable[[dict[str, Any]], Any]

The operation to execute on the input data.

required
input_states list[str] | None

List of input state keys required by the operation. Defaults to None.

None
output_state str | list[str]

Key(s) to store the operation result in the pipeline state. This parameter is required.

required
input_map InputMapSpec

Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Transform" followed by the operation's function name and a unique identifier.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

An instance of StateOperatorStep configured with the provided parameters.

while_do(body, condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache=None, name=None)

Create a WhileDoStep with a concise syntax.

This function creates a do-while loop construct in the pipeline graph. The loop body executes at least once, and a condition node at the end determines whether to loop back to the entry point.

The condition can be either: 1. A Component that must return exactly "true" or "false". 2. A callable that returns a boolean.

Parameters:

Name Type Description Default
body BasePipelineStep | list[BasePipelineStep]

The loop body. Executes at least once.

required
condition LoopConditionType

Evaluated after body execution. If truthy, jumps back to loop entry; if falsy, exits loop.

required
input_map InputMapSpec | None

Unified input map. Defaults to None.

None
output_state str | None

Key to store the condition result in state. Defaults to None.

None
retry_config RetryConfig | None

Retry config for condition. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache CacheConfig | None

Caching configuration. Defaults to None.

None
name str | None

Unique ID for this step. Defaults to None.

None

Returns:

Name Type Description
WhileDoStep WhileDoStep

An instance of WhileDoStep.