Steps
Contains the base classes and implementations of pipeline steps.
ComponentStep(name, component, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep, HasInputsMixin
A pipeline step that executes a specific component.
This step wraps a component, manages its inputs and outputs, and integrates it into the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
component |
Component
|
The component to be executed in this step. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state |
str | list[str] | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. |
Initializes a new ComponentStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
component
|
Component
|
The component to be executed in this step. |
required |
output_state
|
str | list[str] | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None. |
None
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
execute(state, runtime, config=None)
async
Executes the component and processes its output.
This method validates inputs, prepares data, executes the component, and formats the output for integration into the pipeline state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The update to the pipeline state after this step's operation, or None if output_state is None. When not None, this includes new or modified data produced by the component, not the entire state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during component execution. |
TimeoutError
|
If the component execution times out. |
CancelledError
|
If the component execution is cancelled. |
ConditionalStep(name, branches, condition=None, output_state=None, condition_aggregator=lambda x: ';'.join((str(i)) for i in x), input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BranchingStep, HasInputsMixin
A conditional pipeline step that conditionally executes different branches based on specified conditions.
This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.
A minimal usage requires defining the branches to execute based on a condition, which is a callable
that takes input from the state and returns a string identifying the branch to execute.
The condition can be a Component or a Callable. The handling of inputs differs:
1. If the condition is a Component, input_map is used to map the pipeline's
state and config to the component's inputs.
2. If the condition is a Callable, it receives a merged dictionary of the
pipeline's state and config directly. In this case, input_map is not used
to build the payload and should not be passed.
Examples:
ConditionalStep(
name="UseCaseSelection",
branches={"A": step_a, DEFAULT_BRANCH: step_b},
condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)
This will execute step_a if the query contains "", and step_b otherwise.
The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute
if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches,
the step will raise an error.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps. |
condition |
list[ConditionType] | None
|
The condition(s) to evaluate for branch selection. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state |
str | None
|
Key to store the condition result in the state, if desired. |
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. |
Initializes a new ConditionalStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this step. |
required |
branches
|
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
A dictionary mapping condition results to steps or lists of steps to execute. |
required |
condition
|
ConditionType | list[ConditionType] | None
|
The condition(s) to evaluate for branch
selection. If a |
None
|
output_state
|
str | None
|
Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None. |
None
|
condition_aggregator
|
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";"). |
lambda x: join((str(i)) for i in x)
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val|Group) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. 4. dict[str, Group] to wrap lookup/literal specs while keeping conditional behavior unchanged. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
execute(state, runtime, config=None)
async
Executes the conditional step, determines the route, and returns a Command.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Command |
Command
|
A LangGraph Command object with 'goto' for routing and 'update' for state changes. |
execute_direct(state, runtime, config=None)
async
Execute this step directly, handling both branch selection and execution.
This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
select_path(state, runtime)
async
Determines the logical route key based on the evaluated condition(s).
This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The identifier of the selected logical route. |
Raises:
| Type | Description |
|---|---|
Exception
|
May raise exceptions from condition evaluation. |
EmptyStepErrorHandler(output_state)
Bases: BaseStepErrorHandler
Strategy that replace the current state of the output states to None on error.
Attributes:
| Name | Type | Description |
|---|---|---|
output_state |
list[str]
|
Output key(s) to map input values to. |
Initialize the strategy with optional output state mapping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_state
|
str | list[str]
|
Output key(s) to map input values to. Can be a single string, list of strings. |
required |
FallbackStepErrorHandler(fallback)
Bases: BaseStepErrorHandler
Strategy that executes a fallback callable on error.
Attributes:
| Name | Type | Description |
|---|---|---|
fallback |
Callable[[BaseException, PipelineState, Runtime, ErrorContext], dict[str, Any] | None]
|
A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state. |
Initialize the strategy with a fallback callable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallback
|
Callable[[BaseException, PipelineState, Runtime, ErrorContext], dict[str, Any] | None]
|
A callable that generates the fallback state dynamically. It should accept (error, state, runtime, context) and return a fallback state. |
required |
GoToStep(name, target, input_map=None, output_state=None, allow_end=True, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep, HasInputsMixin
A pipeline step that enables dynamic flow control by jumping to specified nodes.
This step allows for dynamic routing in pipelines by jumping to different nodes based on runtime conditions. The target can be a static node name, a state key, or a callable that determines the target at runtime.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
target |
str | Val | Callable[[dict[str, Any]], str]
|
The target node specification. |
input_map |
dict[str, str | Val]
|
Mapping of argument names to either a state/config key (str) or a fixed value (Val). Inherited from HasInputsMixin. |
output_state |
str | None
|
The state key to store the resolved target. |
allow_end |
bool
|
Whether jumping to 'END' is allowed. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. Inherited from BasePipelineStep. |
cache_store |
Any | None
|
The cache store used for caching step results, if configured. Inherited from BasePipelineStep. |
is_cache_enabled |
bool
|
Property indicating whether caching is enabled for this step. Inherited from BasePipelineStep. |
Initializes a new GoToStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
target
|
str | Val | Callable[[dict[str, Any]], str]
|
The target node to jump to. Can be a literal string node name, a Val containing the node name, or a callable that takes the resolved inputs and returns the target node name. |
required |
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
output_state
|
str | None
|
Key to store the target node name in the pipeline state. Defaults to None. |
None
|
allow_end
|
bool
|
Whether to allow jumping to the END node. Defaults to True. |
True
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
add_to_graph(graph, previous_endpoints, retry_policy=None)
Adds the GoToStep to the graph and creates edges.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
Any
|
The LangGraph graph to add the step to. |
required |
previous_endpoints
|
list[str]
|
List of source node names to connect from. |
required |
retry_policy
|
RetryPolicy | None
|
Optional retry policy to override the step's default. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of endpoint names after adding this step. |
apply_exclusions(exclusions)
Applies exclusions to this goto step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exclusions
|
ExclusionSet
|
The exclusion set to apply. |
required |
execute(state, runtime, config=None)
async
Executes the goto step and returns a LangGraph Command.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current pipeline state. |
required |
runtime
|
Runtime
|
The runtime context containing config. |
required |
config
|
RunnableConfig | None
|
Optional runnable configuration. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Command |
Command
|
A LangGraph Command with goto parameter for routing. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the resolved target is invalid. |
KeyError
|
If state key is missing for string targets. |
execute_direct(state, runtime, config=None)
async
Executes the goto step and returns a state update.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current pipeline state. |
required |
runtime
|
Runtime
|
The runtime context containing config. |
required |
config
|
RunnableConfig | None
|
Optional runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: State update with resolved target if output_state is specified, None otherwise. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the resolved target is invalid. |
KeyError
|
If state key is missing for string targets. |
get_mermaid_diagram(indent=0)
Generates a Mermaid diagram representation of this goto step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int
|
Number of spaces to indent the diagram. Defaults to 0. |
0
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The Mermaid diagram string. |
GuardStep(name, condition, success_branch, failure_branch=None, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: ConditionalStep
A conditional step that can terminate pipeline execution if a condition is not met.
This step evaluates a condition and either: 1. Continues execution through the success_branch if the condition is True 2. Executes the failure_branch and terminates if the condition is False
Examples:
pipeline = (
step_a
| GuardStep(
name="auth_check",
condition=lambda x: x["is_authenticated"],
success_branch=step_b,
failure_branch=error_handling_step,
)
| step_c
)
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
condition |
ConditionType
|
The condition to evaluate. |
input_map |
dict[str, str | Any] | None
|
Unified input map. |
success_branch |
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
failure_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately on False condition. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initializes a new GuardStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
condition
|
ConditionType
|
The condition to evaluate. |
required |
success_branch
|
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
required |
failure_branch
|
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None. |
None
|
output_state
|
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
InterruptStep(name, message='Pipeline interrupted.', resume_value_map=None, **kwargs)
Bases: BasePipelineStep
A pipeline step that pauses execution using LangGraph's interrupt feature.
When the pipeline reaches this step, it will throw an interrupt which pauses
execution and returns control to the caller (along with the current state).
To resume, the user must invoke the pipeline again with a Command(resume=...)
payload and the same thread_id.
The value returned from the interrupt() call (which is whatever the user
provides in Command(resume=val)) will be stored in the state key specified
by resume_value_map if provided.
Initializes an InterruptStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The unique identifier for this step. |
required |
message
|
str | dict[str, Any]
|
The payload to emit when the interrupt occurs. Defaults to "Pipeline interrupted.". |
'Pipeline interrupted.'
|
resume_value_map
|
str | list[str] | None
|
State key(s) where the resume value will be stored. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional configuration (e.g., error_handler) for the BasePipelineStep. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
execute(state, runtime, config=None)
async
Execute the interrupt and handle the resumed value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current pipeline state. |
required |
runtime
|
Runtime
|
The pipeline runtime. |
required |
config
|
RunnableConfig | None
|
The RunnableConfig. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: State updates containing the resumed value
if a |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
KeepStepErrorHandler()
Bases: BaseStepErrorHandler
Strategy that preserves the current state on error.
Initialize the keep error handler.
LogStep(name, message, is_template=True, emit_kwargs=None, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep
A specialized pipeline step for logging messages.
This step uses the Messenger component to log messages during pipeline execution. It supports both plain text messages and template messages with placeholders for state variables.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
messenger |
Messenger
|
The messenger component used to format and send messages. |
emit_kwargs |
dict[str, Any]
|
Additional arguments to pass to the event emitter. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initializes a new LogStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
message
|
str
|
The message to be logged. May contain placeholders in curly braces for state variables. |
required |
is_template
|
bool
|
Whether the message is a template with placeholders. Defaults to True. |
True
|
emit_kwargs
|
dict[str, Any] | None
|
Additional keyword arguments to pass to the event emitter. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
execute(state, runtime, config=None)
async
Executes the log step by formatting and emitting the message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
This step does not modify the pipeline state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during message emission. |
MapReduceStep(name, output_state, map_func, reduce_func=lambda results: results, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep, HasInputsMixin
A step that applies a mapping function to multiple inputs and reduces the results.
This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_map). 2. A reduce function that combines all the mapped results.
Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.
Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.
Internally, this step uses asyncio.gather() for efficient parallel execution.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this step. |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. Will be run in parallel if the function is an asynchronous function. |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. |
input_map |
InputMapSpec | None
|
Unified input map. |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. |
Initialize a new MapReduceStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this step. |
required |
output_state
|
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func
|
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_map. |
required |
reduce_func
|
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val|Group) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. 4. dict[str, Group] to keep wrapped lookup/literal values intact across fan-out. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
execute(state, runtime, config=None)
async
Execute the map and reduce operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The reduced result stored under output_state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during execution. |
NoOpStep(name, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep
A step that does nothing.
This step is useful when you want to add a step that does not perform any processing. For example, you can use this step to implement a toggle pattern for a certain component.
Examples:
pipeline = (
step_a
| ConditionalStep(
name="branch",
branches={
"execute": step_b,
"continue": NoOpStep("no_op")
},
condition=lambda x: "execute" if x["should_execute"] else "continue"
)
| step_c
)
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
execute(state, runtime, config=None)
async
Executes this step, which does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
This step does not modify the pipeline state. |
ParallelStep(name, branches, input_states=None, squash=True, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BranchingStep, HasInputsMixin
A pipeline step that executes multiple branches in parallel.
This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.
The step supports two execution modes controlled by the squash parameter:
1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for:
a. Better raw performance
b. Simpler implementation
c. Less overhead
d. Less transparent for debugging and tracing
2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for:
a. More native LangGraph integration
b. More transparent for debugging and tracing
For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, PipelineSteps]
|
The branches to execute in parallel. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
squash |
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initialize a new ParallelStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
branches
|
list | dict[str, PipelineSteps]
|
The branches to execute in parallel. Can be either: List format: Each branch can be: 1. A single step 2. A list of steps to execute sequentially Example: [step1, [step2, step3], step4] Dict format: Keys are branch names, values can be: 1. A single step 2. A list of steps to execute sequentially Example: {"analysis": step1, "validation": [step2, step3], "cleanup": step4} Enables more intuitive step exclusion using branch names. |
required |
input_states
|
list[str] | None
|
Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None. |
None
|
squash
|
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True. |
True
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
add_to_graph(graph, previous_endpoints, retry_policy=None)
Handle both squashed and expanded modes.
For squashed mode: add the parallel step as a single node. For expanded mode: add the parallel step as a single node and add children to graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
StateGraph
|
The graph to add this step to. |
required |
previous_endpoints
|
list[str]
|
Endpoints from previous steps to connect to. |
required |
retry_policy
|
RetryPolicy | None
|
Retry policy to propagate to child steps. Defaults to None, in which case the retry policy of the step is used. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: Exit points after adding all child steps. |
execute(state, runtime, config=None)
async
Execute all branches in parallel and merge their results.
This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced. |
Raises:
| Type | Description |
|---|---|
CancelledError
|
If execution is cancelled, preserved with added context. |
BaseInvokerError
|
If an error occurs during LM invocation. |
RuntimeError
|
For all other exceptions during execution, wrapped with context information. |
TimeoutError
|
If execution times out, preserved with added context. |
ValidationError
|
If input validation fails. |
PauseStep(name, **kwargs)
Bases: BasePipelineStep
A named marker step for use as a static breakpoint target.
PauseStep does not pause execution by itself. It exists only so
that its name can be referenced in interrupt_before or
interrupt_after lists when calling Pipeline.invoke().
This is the recommended pattern for debugging: insert one or more
PauseStep instances at interesting points in the pipeline, then
selectively activate breakpoints at invocation time without recompiling
the graph.
Examples:
from gllm_pipeline.steps import pause, step
from gllm_pipeline.pipeline import Pipeline
bp = pause(name="before_llm")
pipeline = Pipeline([preprocess, bp, llm_step, postprocess])
# Debug run — pause right before the LLM call:
result = await pipeline.invoke(state, interrupt_before=["before_llm"])
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
Initialize a PauseStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this step. This name is used
as the target for |
required |
**kwargs
|
Any
|
Forwarded to |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
execute(state, runtime, config=None)
async
Execute this step, which does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
RaiseStepErrorHandler()
Bases: BaseStepErrorHandler
Strategy that raises exceptions with enhanced context.
Initialize the raise error handler.
StateOperatorStep(name, input_states=None, output_state=None, operation=None, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep, HasInputsMixin
A pipeline step that performs an operation on the pipeline state and updates it.
This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initializes a new StateOperatorStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
input_states
|
list[str]
|
Keys of the state data required by the operation. |
None
|
output_state
|
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
None
|
operation
|
Callable[[dict[str, Any]], Any]
|
The operation to execute. It should accept a dictionary of input data and return the operation result. |
None
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
execute(state, runtime, config=None)
async
Executes the operation and processes its output.
This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during operation execution. |
SubgraphStep(name, subgraph, output_state_map=None, input_map=None, retry_config=None, error_handler=None, cache=None)
Bases: BaseCompositeStep, HasInputsMixin
A pipeline step that executes another pipeline as a subgraph.
This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
subgraph |
Pipeline
|
The pipeline to be executed as a subgraph. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state_map |
dict[str, str]
|
Mapping of parent pipeline state keys to subgraph output keys. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
Initializes a new SubgraphStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
subgraph
|
Pipeline
|
The pipeline to be executed as a subgraph. |
required |
output_state_map
|
dict[str, str] | None
|
Mapping of parent pipeline state keys to subgraph output keys. Defaults to None. |
None
|
input_map
|
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using the SDK's RetryConfig. If None, no retry policy is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Error handler to be used for this step. If None, no error handler is applied. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
apply_exclusions(exclusions)
Apply exclusions to this subgraph and its children.
Subgraph has no internal structural changes. It marks itself and uniformly propagates child exclusions to all subgraph steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exclusions
|
ExclusionSet
|
The exclusion set to apply. |
required |
execute(state, runtime, config=None)
async
Executes the subgraph and processes its output.
This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime
|
Runtime
|
Runtime information for this step's execution. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If an error occurs during subgraph execution, with details about which step caused the error. |
TerminatorStep(name, retry_config=None, error_handler=None, cache=None)
Bases: BasePipelineStep
A step that connects previous steps to the END node.
This step is useful when you want to explicitly terminate a branch or the entire pipeline. It has no processing logic and simply acts as a connection point to the END node.
Examples:
pipeline = (
step_a
| ConditionalStep(
name="branch",
branches={
"terminate": TerminatorStep("early_end"),
"continue": step_b
},
condition=lambda x: "terminate" if x["should_stop"] else "continue"
)
| step_c
)
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
add_to_graph(graph, previous_endpoints, retry_policy=None)
Adds this step to the graph and connects it to the END node.
This method is used by Pipeline to manage the pipeline's execution flow.
It should not be called directly by users.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
StateGraph
|
The graph to add this step to. |
required |
previous_endpoints
|
list[str]
|
The endpoints from previous steps to connect to. |
required |
retry_policy
|
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. If None, the retry policy of the step is used. If the step is not a retryable step, this parameter is ignored. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: Empty list as this step has no endpoints (it terminates the flow). |
execute(state, runtime, config=None)
async
Executes this step, which does nothing but pass through the state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current pipeline state. |
required |
runtime
|
Runtime
|
The runtime information. |
required |
config
|
RunnableConfig | None
|
The runnable configuration. Defaults to None. |
None
|
WhileDoStep(name, body, condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache=None)
Bases: BaseCompositeStep, HasInputsMixin
A pipeline step that executes a do-while loop construct.
The loop body executes at least once and a condition node appended at the end determines whether to jump back to the loop's entry point.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
body |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) to execute in the loop body. |
condition |
LoopConditionType
|
The condition to evaluate after each iteration. |
input_map |
InputMapSpec | None
|
Unified input map for the condition. |
output_state |
str | None
|
Key to store the condition result in the pipeline state. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior. |
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. |
Initialize a WhileDoStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
body
|
BasePipelineStep | list[BasePipelineStep]
|
The loop body to execute. |
required |
condition
|
LoopConditionType
|
The condition to evaluate at the end of each iteration. |
required |
input_map
|
InputMapSpec | None
|
Unified input map for the condition. Defaults to None. |
None
|
output_state
|
str | None
|
Key to store the condition result in the state. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Retry config applied to the condition node. Defaults to None. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Error handling strategy for condition. Defaults to None. |
None
|
cache
|
CacheConfig | None
|
Caching config for the condition. Defaults to None. |
None
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If body is empty or condition is invalid. |
apply_exclusions(exclusions)
Apply exclusions to this composite step and its children.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exclusions
|
ExclusionSet
|
The exclusion set to apply. |
required |
execute(state, runtime, config=None)
async
Evaluate the condition and route back or exit the loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
PipelineState
|
The current state of the pipeline workflow. |
required |
runtime
|
Runtime
|
The runtime context of the pipeline workflow execution. |
required |
config
|
RunnableConfig | None
|
Execution configuration for the node. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Command | dict[str, Any] | None
|
Command | dict[str, Any] | None: A LangGraph command directing execution back to the loop entry, an update payload when the loop exits, or None when there is no state update. |
execute_direct(state, runtime, config=None)
async
Reject direct execution for WhileDoStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
The initial state data. |
required |
runtime
|
Runtime
|
The runtime context of the pipeline execution. |
required |
config
|
RunnableConfig | None
|
Configuration for the runnable. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: This method does not return normally. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised since loop routing requires a graph. |
bundle(input_states, output_state, retry_config=None, error_handler=None, cache=None, name=None)
Create a StateOperatorStep to combine multiple state keys.
This function creates a StateOperatorStep that combines multiple keys from the pipeline state into a single output without modifying the data.
Examples:
bundle_step = bundle(["input1", "input2"], "output")
# Produces: {"output": {"input1": state["input1"], "input2": state["input2"]}}
This will cause the step to bundle the values of input1 and input2 from the pipeline state into a single
dictionary. The result will be stored in the pipeline state under the key output.
With remapping:
# Provide a mapping of desired output field names to source state keys
# Renames state key "user_id" to "id" in the bundled output
bundle_step = bundle({"id": "user_id"}, "output")
# Produces: {"output": {"id": state["user_id"]}}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_states
|
list[str] | dict[str, str]
|
|
required |
output_state
|
str | list[str]
|
Key(s) to store the bundled data in the pipeline state. |
required |
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Bundle" followed by a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
StateOperatorStep |
StateOperatorStep
|
An instance of StateOperatorStep configured to bundle the input states. |
copy(input_state, output_state, retry_config=None, error_handler=None, name=None, cache=None)
Create a step that copies values from input state keys to output state keys.
This function creates a StateOperatorStep that copies data from input state(s) to output state(s) without any transformation. The function handles different scenarios: 1. Single input to single output: Direct copy 2. Single input to multiple outputs: Broadcast the input to all outputs 3. Multiple inputs to single output: Pack all inputs into a list 4. Multiple inputs to multiple outputs: Copy each input to corresponding output (must have same length)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_state
|
str | list[str]
|
Input state key(s) to copy from. |
required |
output_state
|
str | list[str]
|
Output state key(s) to copy to. |
required |
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this step. Defaults to None, in which case the name will be "Copy" followed by a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
StateOperatorStep |
StateOperatorStep
|
An instance of StateOperatorStep configured to copy the input states to output states. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If both input_state and output_state are lists but have different lengths. |
Examples:
Single input to single output:
step = copy("input_data", "output_data")
# Copies value from "input_data" key to "output_data" key
Single input to multiple outputs (broadcast):
step = copy("input_data", ["output1", "output2", "output3"])
# Copies value from "input_data" to all three output keys
Multiple inputs to single output (pack):
step = copy(["input1", "input2", "input3"], "packed_output")
# Packs values from all three input keys into a list at "packed_output"
Multiple inputs to multiple outputs (pairwise):
step = copy(["input1", "input2"], ["output1", "output2"])
# Copies "input1" → "output1" and "input2" → "output2"
With custom name and retry config:
from gllm_core import RetryConfig
retry_cfg = RetryConfig(max_attempts=3, delay=1.0)
step = copy(
"input_data",
"output_data",
name="DataCopyStep",
retry_config=retry_cfg
)
goto(name, target, input_map=None, output_state=None, allow_end=True, retry_config=None, error_handler=None, cache=None)
Create a GoToStep for dynamic flow control.
This convenience function creates a GoToStep that enables dynamic flow control by jumping to specified nodes in the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A unique identifier for this pipeline step. |
required |
target
|
Val | str | Callable
|
The jump target specification. 1. Val: Static target (unwrapped value) 2. str: State key to read target from 3. Callable: Function that computes the target |
required |
input_map
|
InputMapSpec | None
|
Input mapping for callable targets. Only used when target is a callable. Defaults to None. |
None
|
output_state
|
str | None
|
Optional key to store the resolved target in state. If provided, the resolved target will be saved to this state key. Defaults to None. |
None
|
allow_end
|
bool
|
Whether jumping to "END" is allowed. If False and target resolves to "END", a ValueError is raised. Defaults to True. |
True
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior. Defaults to None. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy for error handling. Defaults to None. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
GoToStep |
GoToStep
|
A configured GoToStep instance. |
guard(condition, success_branch, failure_branch=None, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a GuardStep with a concise syntax.
This function creates a GuardStep that can terminate pipeline execution if a condition is not met.
Examples:
auth_check = lambda state: state["is_authenticated"]
success_step = step(SuccessHandler(), {"input": "input"}, "output")
error_step = step(ErrorHandler(), {"error": "auth_error"}, "error_message")
guard_step = guard(
auth_check,
success_branch=success_step,
failure_branch=error_step,
input_map={"user_id": "current_user", "model": "auth_model", "strict_mode": Val(True)},
output_state="auth_result",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
ConditionType | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
success_branch
|
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
required |
failure_branch
|
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None. |
None
|
output_state
|
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
input_map
|
InputMapSpec
|
Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Guard" followed by the condition's function name and a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
GuardStep |
GuardStep
|
An instance of GuardStep configured with the provided parameters. |
if_else(condition, if_branch, else_branch, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a simple ConditionalStep with two branches.
This function creates a ConditionalStep that executes one of two branches based on a condition.
The condition can be either: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false")
For boolean conditions and string conditions, True/true/TRUE maps to the if_branch and False/false/FALSE maps to the else_branch.
Examples:
With a Callable condition:
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["input"] > data["threshold"]
if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")
if_else_step = if_else(
condition,
if_branch,
else_branch,
output_state="condition_result",
input_map={"threshold": Val(0)}
)
This will cause the step to execute the PositiveComponent if the input in the pipeline state is greater than
the threshold (0), and the NegativeComponent otherwise. The result of the condition will be stored in the
pipeline state under the key condition_result.
With a Component condition:
# Using a Component condition - requires input_map for specifying inputs
threshold_checker = ThresholdChecker() # A Component that returns "true" or "false"
if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")
if_else_step = if_else(
threshold_checker,
if_branch,
else_branch,
output_state="condition_result",
input_map={
"value": "input",
"threshold": "threshold_config",
"strict_mode": Val(True),
}
)
This will cause the step to execute the ThresholdChecker component with the input from the pipeline state
as its value parameter and the threshold_config from runtime configuration as its threshold parameter.
Based on the component's result ("true" or "false"), it will execute either the PositiveComponent or
the NegativeComponent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
ConditionType | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
if_branch
|
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is true. |
required |
else_branch
|
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is false. |
required |
output_state
|
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
input_map
|
InputMapSpec
|
Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "IfElse" followed by the condition's function name and a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
interrupt(message='Pipeline interrupted.', resume_value_map=None, name=None)
Create a specialized step that pauses pipeline execution.
This function creates an InterruptStep that leverages LangGraph's native interrupt behavior to halt execution and return control to the caller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str | dict[str, Any]
|
The payload to emit when paused. Defaults to "Pipeline interrupted.". |
'Pipeline interrupted.'
|
resume_value_map
|
str | list[str] | None
|
State key(s) where the resumed value will be stored. Defaults to None. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
InterruptStep |
InterruptStep
|
An instance of InterruptStep. |
log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache=None)
Create a specialized step for logging messages.
This function creates a LogStep that logs messages within a pipeline. It can be used to log status updates, debugging information, or any other text during pipeline execution.
The message can be a plain string or a template with placeholders for state variables.
Examples:
Plain message:
log_step = log("Processing document", is_template=False)
Template message with state variables:
log_step = log("Processing query: {query} with model: {model_name}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
The message to be logged. May contain placeholders in curly braces for state variables. |
required |
is_template
|
bool
|
Whether the message is a template with placeholders. Defaults to True. |
True
|
emit_kwargs
|
dict[str, Any] | None
|
Additional keyword arguments to pass to the event emitter. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. If None, a name will be auto-generated with the prefix "log_". Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
LogStep |
LogStep
|
A specialized pipeline step for logging messages. |
map_reduce(output_state, map_func, reduce_func=lambda results: results, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a MapReduceStep that maps a function over multiple inputs and reduces the results.
This function creates a step that applies a mapping function to multiple inputs in parallel and combines the results using a reduction function.
The map_func can be either:
1. A callable function that takes a dictionary as input and returns a result.
2. A Component instance, which will be executed with proper async handling.
Important note on parallel execution:
1. For true parallelism, the map_func MUST be an async function or a Component.
2. Synchronous map functions will block the event loop and run sequentially.
The step supports automatic broadcasting of scalar values and handles lists appropriately: 1. If multiple list inputs are provided, they must be the same length. 2. Scalar inputs are broadcasted to match list lengths.
Examples:
Processing a list of items with an async map function:
async def count_words(item):
await asyncio.sleep(0.1) # Simulate I/O operation
return len(item["document"].split())
process_docs = map_reduce(
output_state="word_counts", # A list of word counts for each document
map_func=count_words,
input_map=[{"document": "documents"}], # A list, e.g. ["doc1...", "doc2...", "doc3..."]
reduce_func=lambda results: sum(results), # Sum word counts
)
# When executed with {"documents": ["doc1...", "doc2...", "doc3..."]},
# returns {"word_counts": 60} (total word count)
Broadcasting scalar values to match list length:
# Apply a common threshold to multiple values
threshold_check = map_reduce(
output_state="above_threshold",
map_func=lambda item: item["value"] > item["threshold"],
input_map=[{"value": "values", "threshold": "threshold"}], # "values" is a list, "threshold" is scalar
reduce_func=lambda results: results # Return list of boolean results
)
# When executed with {"values": [5, 10, 15], "threshold": 8},
# returns {"above_threshold": [False, True, True]}
Multiple list inputs with the same length:
similarity_step = map_reduce(
output_state="similarity_scores",
map_func=lambda item: calculate_similarity(item["doc1"], item["doc2"]),
input_map=[{"doc1": "documents_a", "doc2": "documents_b"}], # Both lists of same length
reduce_func=lambda results: sum(results) / len(results) # Average similarity
)
# When executed with {"documents_a": ["doc1", "doc2", "doc3"], "documents_b": ["docA", "docB", "docC"]},
# returns {"similarity_scores": 0.75}
Using a Component for complex processing instead of a map function:
summarizer = TextSummarizer() # Subclass of Component
summarize_step = map_reduce(
output_state="summaries",
map_func=summarizer,
input_map=[{"text": "documents", "max_length": "max_length"}], # List of documents + scalar param
reduce_func=lambda results: [r["summary"] for r in results]
)
# When executed with {"documents": ["doc1...", "doc2..."], "max_length": 50},
# returns {"summaries": ["summary1...", "summary2..."]}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_state
|
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func
|
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_map. |
required |
reduce_func
|
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
input_map
|
InputMapSpec
|
Direct unified input map. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this step. Defaults to None, in which case the name will be "MapReduce" followed by the map function name. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
MapReduceStep |
MapReduceStep
|
An instance of MapReduceStep configured with the provided parameters. |
no_op(name=None)
Create a NoOpStep to add a step that does nothing.
This function creates a PassThroughStep that does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "NoOp" followed by a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
NoOpStep |
NoOpStep
|
An instance of NoOpStep. |
parallel(branches, input_states=None, squash=True, input_map=None, error_handler=None, cache=None, retry_config=None, name=None)
Create a ParallelStep that executes multiple branches concurrently.
This function creates a ParallelStep that runs multiple branches in parallel and merges their results. Each branch can be a single step or a list of steps to execute sequentially.
The step supports two execution modes controlled by the squash parameter:
1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for:
a. Better raw performance
b. Simpler implementation
c. Less overhead
d. Less transparent for debugging and tracing
2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for:
a. More native LangGraph integration
b. More transparent for debugging and tracing
For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.
Examples:
Define branches as a list of steps or lists of steps:
parallel_step = parallel(
branches=[
step(ComponentA(), {"input": "query"}, "output_a"),
[
step(ComponentB1(), {"input": "query"}, "output_b1"),
step(ComponentB2(), {"input": "output_b1"}, "output_b2")
],
step(ComponentC(), {"input": "query"}, "output_c")
],
input_states=["query"], # Only 'query' will be passed to branches
)
Define branches as a dictionary of branches: Other than the list format, we can also use the dictionary format for branches to make it easier to exclude branches.
parallel_step = parallel(
branches={
"branch_a": step(ComponentA(), {"input": "query"}, "output_a"),
"branch_b": step(ComponentB(), {"input": "query"}, "output_b"),
},
input_states=["query"],
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_states
|
list[str] | None
|
Keys from the state to pass to branches. Defaults to None, in which case all state keys will be passed. |
None
|
squash
|
bool
|
Whether to squash execution into a single node. If True, uses asyncio.gather() to run branches in parallel. If False, uses native LangGraph structures for parallelism. Defaults to True. |
True
|
input_map
|
InputMapSpec
|
Direct unified input map. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this parallel step. Defaults to None. In this case, a name will be auto-generated. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ParallelStep |
ParallelStep
|
An instance of ParallelStep configured with the provided branches. |
pause(name=None)
Create a named marker step for static breakpoint targeting.
A PauseStep does not pause execution by itself. Its name is
meant to be referenced in the interrupt_before or interrupt_after
parameters of Pipeline.invoke().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
A unique identifier for this step. Defaults to None, in which case a name is auto-generated. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
PauseStep |
'PauseStep'
|
An instance of PauseStep. |
step(component, output_state=None, input_map=None, retry_config=None, error_handler=None, name=None, cache=None)
Create a ComponentStep with a concise syntax.
This function creates a ComponentStep, which wraps a component and manages its inputs and outputs within the pipeline.
Examples:
We can leverage the input_map parameter to specify both state/config keys (as strings)
and fixed values (as any type) in a single dictionary.
retriever = Retriever()
retriever_step = step(
retriever,
input_map={
"query": "user_input",
"top_k": "config_top_k",
"conversation_id": "Val(<fixed_value>)",
}
output_state="retrieved_data",
)
This will cause the step to execute the Retriever component with the following behavior:
1. It will pass the user_input from the pipeline state to the query argument of the Retriever.
2. It will pass the config_top_k from the runtime configuration to the top_k argument of the Retriever.
3. It will pass the <fixed_value> from the conversation_id argument of the Retriever.
4. It will store the retrieved_data from the Retriever result in the pipeline state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
Component
|
The component to be executed in this step. |
required |
output_state
|
str | list[str] | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None. |
None
|
input_map
|
InputMapSpec
|
Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be the component's class name followed by a unique identifier. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ComponentStep |
ComponentStep
|
An instance of ComponentStep configured with the provided parameters. |
subgraph(subgraph, output_state_map=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a SubgraphStep that executes another pipeline as a subgraph.
This function creates a SubgraphStep that allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.
The SubgraphStep gracefully handles missing state keys - if a key specified in input_map is not present
in the parent state, it will be omitted from the subgraph input rather than causing an error. This allows
for flexible composition of pipelines with different state schemas.
Examples:
from gllm_pipeline.utils.typing_compat import TypedDict
from gllm_pipeline.pipeline.pipeline import Pipeline
# Define state schemas using TypedDict
class SubgraphState(TypedDict):
query: str
retrieved_data: list
reranked_data: list
class ParentState(TypedDict):
user_input: str
query: str
reranked: list
response: str
# Define a subgraph pipeline with its own state schema
subgraph_pipeline = Pipeline(
[
step(Retriever(), {"query": "query"}, "retrieved_data"),
step(Reranker(), {"data": "retrieved_data"}, "reranked_data")
],
state_type=SubgraphState
)
# Use the subgraph in a parent pipeline
parent_pipeline = Pipeline(
[
step(QueryProcessor(), {"input": "user_input"}, "query"),
subgraph(
subgraph_pipeline,
input_map={"query": "query", "model": "retrieval_model", "top_k": Val(10)},
output_state_map={"reranked": "reranked_data"},
),
step(ResponseGenerator(), {"data": "reranked"}, "response")
],
state_type=ParentState
)
# When the parent pipeline runs:
# 1. QueryProcessor processes user_input and produces query
# 2. SubgraphStep creates a new state for the subgraph with query from parent
# 3. Subgraph executes its steps (Retriever → Reranker)
# 4. SubgraphStep maps reranked_data from subgraph to reranked in parent
# 5. ResponseGenerator uses reranked to produce response
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subgraph
|
Pipeline
|
The pipeline to be executed as a subgraph. |
required |
output_state_map
|
dict[str, str] | None
|
Mapping of parent pipeline state keys to subgraph output keys. If None, all subgraph outputs will be passed as-is. |
None
|
input_map
|
InputMapSpec | None
|
Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
SubgraphStep |
SubgraphStep
|
An instance of SubgraphStep configured with the provided parameters. |
switch(condition, branches, output_state=None, default=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a ConditionalStep with multiple branches.
This function creates a ConditionalStep that can execute one of multiple branches based on a condition.
Examples:
With a Callable condition:
# Using a Callable condition - receives merged state and config directly
def extract_command(data):
# Access both state and config in a single dictionary
query = data["query"]
separator = data["separator"] # From runtime config or state
return query.split(separator)[0]
branches = {
"search": step(SearchComponent(), {"query": "query"}, "search_result"),
"filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
}
default = step(NoOpComponent(), {}, "no_op_result")
switch_step = switch(
extract_command,
branches,
input_map={"separator": Val(" ")}
output_state="command_type",
default=default,
)
This will cause the step to execute the SearchComponent if the first part of the query in the pipeline state
is "search", the FilterComponent if it is "filter", and the NoOpComponent otherwise. The separator is provided
as a fixed argument. The result of the condition will be stored in the pipeline state under the key
command_type.
With a Component condition:
# Using a Component condition - requires input_map for specifying inputs
command_extractor = CommandExtractor() # A Component that extracts command from query
branches = {
"search": step(SearchComponent(), {"query": "query"}, "search_result"),
"filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
"sort": step(SortComponent(), {"query": "query"}, "sort_result"),
}
default = step(DefaultComponent(), {"query": "query"}, "default_result")
switch_step = switch(
command_extractor,
branches,
input_map={"text": "query", "delimiter": "separator_config", "lowercase": Val(True)},
output_state="command_type",
default=default,
)
This will cause the step to execute the CommandExtractor component with the query from the pipeline state
as its text parameter and the separator_config from runtime configuration as its delimiter parameter.
Based on the component's result (which should be one of "search", "filter", "sort", or something else),
it will execute the corresponding branch component or the default component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
ConditionType
|
The condition to evaluate for branch selection. |
required |
branches
|
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps to execute. |
required |
output_state
|
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
default
|
BasePipelineStep | list[BasePipelineStep] | None
|
Default branch to execute if no condition matches. Defaults to None. |
None
|
input_map
|
InputMapSpec
|
Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Switch" followed by the condition's function name and a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
terminate(name=None, retry_config=None, error_handler=None, cache=None)
Create a TerminatorStep to end pipeline execution.
This function creates a TerminatorStep that explicitly terminates a branch or the entire pipeline.
Examples:
early_exit = terminate("early_exit")
pipeline = (
step_a
| if_else(should_stop, early_exit, step_b)
| step_c
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Terminator" followed by a unique identifier. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TerminatorStep |
TerminatorStep
|
An instance of TerminatorStep. |
toggle(condition, if_branch, output_state=None, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a ConditionalStep that toggles between a branch and a no-op.
This function creates a ConditionalStep that executes a branch if the condition is true, and does nothing (no-op) if the condition is false.
The condition can be: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false") 4. A string key that will be looked up in the merged state data (state + runtime config + fixed args). The value will be evaluated for truthiness - any non-empty, non-zero, non-False value will be considered True.
Examples:
With a Callable condition:
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["feature_enabled"] and data["user_tier"] >= 2
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle(
condition,
feature_step,
output_state="feature_status",
input_map={"user_tier": Val(2)},
)
This will execute the FeatureComponent only if both feature_enabled is true and user_tier is at least 2.
Otherwise, it will do nothing. The condition result will be stored in the pipeline state under the key
feature_status.
With a Component condition:
# Using a Component condition - requires input_map for specifying inputs
feature_checker = FeatureChecker() # A Component that returns "true" or "false"
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle(
feature_checker,
feature_step,
output_state="feature_status",
input_map={"user_id": "current_user", "feature_name": "target_feature", "check_permissions": Val(True)},
)
This will cause the step to execute the FeatureChecker component with the current_user from the pipeline state
as its user_id parameter and the target_feature from runtime configuration as its feature_name parameter.
Based on the component's result ("true" or "false"), it will either execute the FeatureComponent or do nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
ConditionType | Callable[[dict[str, Any]], bool] | str
|
The condition to evaluate. |
required |
if_branch
|
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is true. |
required |
output_state
|
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
input_map
|
InputMapSpec | None
|
Direct unified input map. If provided, it is used directly; otherwise it will be synthesized from maps. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Toggle" followed by a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
transform(operation, input_states=None, *, output_state, input_map=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a StateOperatorStep for transforming state data.
This function creates a StateOperatorStep that applies a transformation operation to the pipeline state.
Note that the function operation should accept a dictionary of input data and return the operation result.
Examples:
def sort(data: dict) -> dict:
is_reverse = data["reverse"]
data["chunk"] = sorted(data["chunk"], reverse=is_reverse)
transform_step = transform(
operation=sort,
input_map=["chunk", {"reverse": Val(True)}],
output_state="output",
)
This will cause the step to execute the sort operation on the chunk in the pipeline state. The result will
be stored in the pipeline state under the key output. The behavior is controlled by the runtime configuration
key reverse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
Callable[[dict[str, Any]], Any]
|
The operation to execute on the input data. |
required |
input_states
|
list[str] | None
|
List of input state keys required by the operation. Defaults to None. |
None
|
output_state
|
str | list[str]
|
Key(s) to store the operation result in the pipeline state. This parameter is required. |
required |
input_map
|
InputMapSpec
|
Direct unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache
|
CacheConfig | None
|
Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used. |
None
|
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Transform" followed by the operation's function name and a unique identifier. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
StateOperatorStep |
StateOperatorStep
|
An instance of StateOperatorStep configured with the provided parameters. |
while_do(body, condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache=None, name=None)
Create a WhileDoStep with a concise syntax.
This function creates a do-while loop construct in the pipeline graph. The loop body executes at least once, and a condition node at the end determines whether to loop back to the entry point.
The condition can be either: 1. A Component that must return exactly "true" or "false". 2. A callable that returns a boolean.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
body
|
BasePipelineStep | list[BasePipelineStep]
|
The loop body. Executes at least once. |
required |
condition
|
LoopConditionType
|
Evaluated after body execution. If truthy, jumps back to loop entry; if falsy, exits loop. |
required |
input_map
|
InputMapSpec | None
|
Unified input map. Defaults to None. |
None
|
output_state
|
str | None
|
Key to store the condition result in state. Defaults to None. |
None
|
retry_config
|
RetryConfig | None
|
Retry config for condition. Defaults to None. |
None
|
error_handler
|
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache
|
CacheConfig | None
|
Caching configuration. Defaults to None. |
None
|
name
|
str | None
|
Unique ID for this step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
WhileDoStep |
WhileDoStep
|
An instance of WhileDoStep. |