Steps
Contains the base classes and implementations of pipeline steps.
ComponentStep(name, component, input_state_map, output_state=None, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A pipeline step that executes a specific component.
This step wraps a component, manages its inputs and outputs, and integrates it into the pipeline.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
component |
Component
|
The component to be executed in this step. |
input_state_map |
dict[str, str]
|
Mapping of component input arguments to pipeline state keys. |
output_state |
str | list[str] | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. |
runtime_config_map |
dict[str, str] | None
|
Mapping of component input arguments to runtime configuration keys. |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the component. |
Initializes a new ComponentStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
component |
Component
|
The component to be executed in this step. |
required |
input_state_map |
dict[str, str]
|
Mapping of component input arguments to pipeline state keys. Keys are input arguments expected by the component, values are corresponding state keys. |
required |
output_state |
str | list[str]) | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of component input arguments to runtime configuration keys. Keys are input arguments expected by the component, values are runtime configuration keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the component. Defaults to None. |
None
|
execute(state, config)
async
Executes the component and processes its output.
This method validates inputs, prepares data, executes the component, and formats the output for integration into the pipeline state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The update to the pipeline state after this step's operation, or None if output_state is None. When not None, this includes new or modified data produced by the component, not the entire state. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during component execution. |
TimeoutError
|
If the component execution times out. |
CancelledError
|
If the component execution is cancelled. |
ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A conditional pipeline step that conditionally executes different branches based on specified conditions.
This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.
A minimal usage requires defining the branches to execute based on a condition
, which is a callable
that takes input from the state and returns a string identifying the branch to execute.
The condition can be a Component
or a Callable
. The handling of inputs differs:
1. If the condition is a Component
, input_state_map
and runtime_config_map
are
used to map the pipeline's state and config to the component's inputs.
2. If the condition is a Callable
, it receives a merged dictionary of the
pipeline's state and config directly. input_state_map
and runtime_config_map
are ignored in this case.
Example:
ConditionalStep(
name="UseCaseSelection",
branches={"A": step_a, DEFAULT_BRANCH: step_b},
condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)
This will execute step_a
if the query contains "", and step_b
otherwise.
The special key __default__
(importable as DEFAULT_BRANCH) defines the default branch to execute
if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches,
the step will raise an error.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps. |
condition |
list[ConditionType] | None
|
The condition(s) to evaluate for branch selection. |
input_state_map |
dict[str, str] | None
|
A dictionary mapping the state keys to the component's
input keys. This is only used if the condition is a |
output_state |
str | None
|
Key to store the condition result in the state, if desired. |
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. |
runtime_config_map |
dict[str, str] | None
|
A dictionary mapping the runtime config keys to the
component's input keys. This is only used if the condition is a |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition evaluation. |
Initializes a new ConditionalStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps to execute. |
required |
condition |
ConditionType | list[ConditionType] | None
|
The condition(s) to evaluate for branch
selection. If a |
None
|
input_state_map |
dict[str, str] | None
|
A dictionary mapping the state keys to the component's
input keys. This is only used if the condition is a |
None
|
output_state |
str | None
|
Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None. |
None
|
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";"). |
lambda x: join(str(i) for i in x)
|
runtime_config_map |
dict[str, str] | None
|
A dictionary mapping the runtime config keys to the
component's input keys. This is only used if the condition is a
|
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None. |
None
|
add_to_graph(graph, previous_endpoints)
Integrates this step into the pipeline's internal structure.
This method is used by Pipeline
to manage the pipeline's execution flow.
It should not be called directly by users.
This method always creates a condition node that: 1. Executes the condition and stores its result if output_state is defined 2. Acts as a pass-through router if no output_state is defined 3. Connects to the appropriate branch based on condition evaluation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
StateGraph
|
The internal representation of the pipeline structure. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
Returns:
Type | Description |
---|---|
list[str]
|
list[str]: The exit points (endpoints) of all non-terminating branches. |
execute(state, config)
async
Executes the conditional step, determines the route, and returns a Command.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
Command |
Command
|
A LangGraph Command object with 'goto' for routing and 'update' for state changes. |
execute_direct(state, config)
async
Execute this step directly, handling both branch selection and execution.
This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
get_all_steps()
Gets all steps from all branches.
Returns:
Type | Description |
---|---|
list[BasePipelineStep]
|
list[BasePipelineStep]: A list of all steps in all branches. |
get_mermaid_diagram()
Create a Mermaid diagram representation of the conditional step.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The complete Mermaid diagram representation of the conditional branches. |
select_path(state, config)
async
Determines the logical route key based on the evaluated condition(s).
This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key. |
GuardStep(name, condition, success_branch, failure_branch=None, **kwargs)
Bases: ConditionalStep
A conditional step that can terminate pipeline execution if a condition is not met.
This step evaluates a condition and either: 1. Continues execution through the success_branch if the condition is True 2. Executes the failure_branch and terminates if the condition is False
Example:
pipeline = (
step_a
| GuardStep(
name="auth_check",
condition=lambda x: x["is_authenticated"],
success_branch=step_b,
failure_branch=error_handling_step,
)
| step_c
)
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
condition |
ConditionType
|
The condition to evaluate. |
success_branch |
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
failure_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately on False condition. |
Initializes a new GuardStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
condition |
ConditionType
|
The condition to evaluate. |
required |
success_branch |
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
required |
failure_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None. |
None
|
**kwargs |
Any
|
Additional arguments to pass to ConditionalStep. |
{}
|
LogStep(name, message, is_template=True, emit_kwargs=None)
Bases: BasePipelineStep
A specialized pipeline step for logging messages.
This step uses the Messenger component to log messages during pipeline execution. It supports both plain text messages and template messages with placeholders for state variables.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
messenger |
Messenger
|
The messenger component used to format and send messages. |
emit_kwargs |
dict[str, Any]
|
Additional arguments to pass to the event emitter. |
Initializes a new LogStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
message |
str
|
The message to be logged, may contain placeholders enclosed in curly braces. |
required |
is_template |
bool
|
Whether the message contains placeholders. Defaults to True. |
True
|
emit_kwargs |
dict[str, Any] | None
|
Additional arguments to pass to the event emitter. Defaults to None. |
None
|
execute(state, config)
async
Executes the log step by formatting and emitting the message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
None |
None
|
This step does not modify the pipeline state. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during message emission. |
MapReduceStep(name, input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A step that applies a mapping function to multiple inputs and reduces the results.
This step performs parallel processing of multiple input items using: 1. A map function that processes each input item independently. The map function receives a dictionary containing the input values for the current item (derived from input_state_map, runtime_config_map, and fixed_args). 2. A reduce function that combines all the mapped results.
Note on parallel execution: 1. For true parallelism, the map_func MUST be an async function or a Component. 2. Synchronous map functions will block the event loop and run sequentially.
Input handling: 1. Automatically detects which inputs are lists/sequences. 2. Ensures all list inputs have the same length. 3. Broadcasts scalar values to match list lengths. 4. If no list inputs, applies the map function once to the whole input.
Internally, this step uses asyncio.gather() for efficient parallel execution.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this step. |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. Will be run in parallel if the function is an asynchronous function. |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. |
input_state_map |
dict[str, str]
|
Mapping of function arguments to pipeline state keys. |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
runtime_config_map |
dict[str, str] | None
|
Mapping of function arguments to runtime config keys. |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. |
Initialize a new MapReduceStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this step. |
required |
input_state_map |
dict[str, str]
|
Mapping of function arguments to pipeline state keys. |
required |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args. |
required |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
runtime_config_map |
dict[str, str] | None
|
Mapping of arguments to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. Defaults to None. |
None
|
execute(state, config)
async
Execute the map and reduce operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The reduced result stored under output_state. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during execution. |
NoOpStep(name)
Bases: BasePipelineStep
A step that does nothing.
This step is useful when you want to add a step that does not perform any processing. For example, you can use this step to implement a toggle pattern for a certain component.
Example:
pipeline = (
step_a
| ConditionalStep(
name="branch",
branches={
"execute": step_b,
"continue": NoOpStep("no_op")
},
condition=lambda x: "execute" if x["should_execute"] else "continue"
)
| step_c
)
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
Initializes a new pipeline step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
required |
execute(state, config)
async
Executes this step, which does nothing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
None |
None
|
This step does not modify the pipeline state. |
ParallelStep(name, branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A pipeline step that executes multiple branches in parallel.
This step wraps multiple branches and executes them concurrently, then merges their results. Each branch can be either a single step or a list of steps to be executed sequentially.
The step supports two execution modes controlled by the squash
parameter:
1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for:
a. Better raw performance
b. Simpler implementation
c. Less overhead
d. Less transparent for debugging and tracing
2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for:
a. More native LangGraph integration
b. More transparent for debugging and tracing
For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
list[BasePipelineStep | list[BasePipelineStep]]]
|
The branches to execute in parallel. |
squash |
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. |
input_states |
list[str] | None
|
Keys from the state that should be passed to branches. If None, all state keys will be passed. |
Initialize a new ParallelStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
branches |
list[BasePipelineStep | list[BasePipelineStep]]
|
The branches to execute in parallel. Each branch can be: 1. A single step 2. A list of steps to execute sequentially |
required |
input_states |
list[str] | None
|
Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None. |
None
|
squash |
bool
|
Whether to squash execution into a single node. 1. If True, uses asyncio.gather() to run branches in parallel. This will create a single node. 2. If False, uses native LangGraph structures for parallelism. This will create multiple nodes. Defaults to True. |
True
|
runtime_config_map |
dict[str, str] | None
|
Mapping of input keys to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used. |
None
|
add_to_graph(graph, previous_endpoints)
Add this step to the graph.
Uses either squashed or expanded approach based on the squash parameter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
StateGraph
|
The graph to add this step to. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
Returns:
Type | Description |
---|---|
list[str]
|
list[str]: The endpoint(s) of this step. |
execute(state, config)
async
Execute all branches in parallel and merge their results.
This method is only used for the squashed approach. For the expanded approach, the execution is handled by the graph structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The merged results from all parallel branches, or None if no updates were produced. |
Raises:
Type | Description |
---|---|
CancelledError
|
If execution is cancelled, preserved with added context. |
BaseInvokerError
|
If an error occurs during LM invocation. |
RuntimeError
|
For all other exceptions during execution, wrapped with context information. |
TimeoutError
|
If execution times out, preserved with added context. |
ValidationError
|
If input validation fails. |
get_mermaid_diagram()
Combines the base diagram and nested step diagrams into a complete visualization.
If the parallel step is squashed, a custom logic is used to represent the structure. If the parallel step is not squashed, the structure is created natively using LangGraph nodes.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Complete Mermaid diagram for this parallel step. |
StateOperatorStep(name, input_states, output_state, operation, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A pipeline step that performs an operation on the pipeline state and updates it.
This step executes a given operation using selected data from the current pipeline state and runtime configuration, then updates the state with the operation's result.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
input_states |
list[str]
|
Keys of the state data required by the operation. |
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute. Accepts a dictionary of input data, which consists of the extracted state and runtime configuration. |
runtime_config_map |
dict[str, str] | None
|
Mapping of operation input arguments to runtime configuration keys. |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the operation. |
Initializes a new StateOperatorStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
input_states |
list[str]
|
Keys of the state data required by the operation. |
required |
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
required |
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute. It should accept a dictionary of input data and return the operation result. |
required |
runtime_config_map |
dict[str, str] | None
|
Mapping of operation input arguments to runtime configuration keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the operation. Defaults to None. |
None
|
execute(state, config)
async
Executes the operation and processes its output.
This method validates inputs, prepares data, executes the operation, and formats the output for integration into the pipeline state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the operation, not the entire state. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during operation execution. |
SubgraphStep(name, subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A pipeline step that executes another pipeline as a subgraph.
This step allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
subgraph |
Pipeline
|
The pipeline to be executed as a subgraph. |
input_state_map |
dict[str, str]
|
Mapping of subgraph input keys to parent pipeline state keys. |
output_state_map |
dict[str, str]
|
Mapping of parent pipeline state keys to subgraph output keys. |
runtime_config_map |
dict[str, str] | None
|
Mapping of subgraph input keys to runtime configuration keys. |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the subgraph. |
Initializes a new SubgraphStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
subgraph |
Pipeline
|
The pipeline to be executed as a subgraph. |
required |
input_state_map |
dict[str, str]
|
Mapping of subgraph input keys to parent pipeline state keys. |
None
|
output_state_map |
dict[str, str]
|
Mapping of parent pipeline state keys to subgraph output keys. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used. |
None
|
execute(state, config)
async
Executes the subgraph and processes its output.
This method prepares data, executes the subgraph, and formats the output for integration into the parent pipeline state. It only uses keys that are actually present in the state, ignoring missing keys to prevent errors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The update to the pipeline state after this step's operation. This includes new or modified data produced by the subgraph, not the entire state. If a requested output key is not present in the subgraph result, its value will be None. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an error occurs during subgraph execution, with details about which step caused the error. |
get_mermaid_diagram()
Create a Mermaid diagram representation of the subgraph.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The complete Mermaid diagram representation of the subgraph. |
TerminatorStep(name)
Bases: BasePipelineStep
A step that connects previous steps to the END node.
This step is useful when you want to explicitly terminate a branch or the entire pipeline. It has no processing logic and simply acts as a connection point to the END node.
Example:
pipeline = (
step_a
| ConditionalStep(
name="branch",
branches={
"terminate": TerminatorStep("early_end"),
"continue": step_b
},
condition=lambda x: "terminate" if x["should_stop"] else "continue"
)
| step_c
)
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
Initializes a new pipeline step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
required |
add_to_graph(graph, previous_endpoints)
Adds this step to the graph and connects it to the END node.
This method is used by Pipeline
to manage the pipeline's execution flow.
It should not be called directly by users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
StateGraph
|
The graph to add this step to. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
Returns:
Type | Description |
---|---|
list[str]
|
list[str]: Empty list as this step has no endpoints (it terminates the flow). |
execute(state, config)
async
Executes this step, which does nothing but pass through the state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current pipeline state. |
required |
config |
RunnableConfig
|
The runtime configuration. |
required |
bundle(input_states, output_state, name=None)
Create a StateOperatorStep to combine multiple state keys.
This function creates a StateOperatorStep that combines multiple keys from the pipeline state into a single output without modifying the data.
Usage example:
bundle_step = bundle(["input1", "input2"], "output")
This will cause the step to bundle the values of input1
and input2
from the pipeline state into a single
dictionary. The result will be stored in the pipeline state under the key output
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_states |
list[str]
|
List of input state keys to be bundled. |
required |
output_state |
str | list[str]
|
Key(s) to store the bundled data in the pipeline state. |
required |
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Bundle" followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateOperatorStep |
StateOperatorStep
|
An instance of StateOperatorStep configured to bundle the input states. |
guard(condition, success_branch, failure_branch=None, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)
Create a GuardStep with a concise syntax.
This function creates a GuardStep that can terminate pipeline execution if a condition is not met.
Usage example:
auth_check = lambda state: state["is_authenticated"]
success_step = step(SuccessHandler(), {"input": "input"}, "output")
error_step = step(ErrorHandler(), {"error": "auth_error"}, "error_message")
guard_step = guard(
auth_check,
success_branch=success_step,
failure_branch=error_step,
input_state_map={"user_id": "current_user"},
output_state="auth_result"
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition |
ConditionType | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
success_branch |
BasePipelineStep | list[BasePipelineStep]
|
Steps to execute if condition is True. |
required |
failure_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is False. If None, pipeline terminates immediately. Defaults to None. |
None
|
input_state_map |
dict[str, str] | None
|
Mapping of condition input arguments to pipeline state keys. Defaults to None. |
None
|
output_state |
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of condition input arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Guard" followed by the condition's function name and a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
GuardStep |
GuardStep
|
An instance of GuardStep configured with the provided parameters. |
if_else(condition, if_branch, else_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)
Create a simple ConditionalStep with two branches.
This function creates a ConditionalStep that executes one of two branches based on a condition.
The condition can be either: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false")
For boolean conditions and string conditions, True/true/TRUE maps to the if_branch and False/false/FALSE maps to the else_branch.
Usage example with a Callable condition:
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["input"] > data["threshold"]
if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")
if_else_step = if_else(
condition,
if_branch,
else_branch,
# input_state_map and runtime_config_map are ignored for Callable conditions,
# but can still be specified (they will have no effect)
output_state="condition_result",
fixed_args={"threshold": 0} # This will be merged with state and config
)
This will cause the step to execute the PositiveComponent if the input
in the pipeline state is greater than the
threshold (0), and the NegativeComponent otherwise. The result of the condition will be stored in the pipeline state
under the key condition_result
.
Usage example with a Component condition:
# Using a Component condition - requires input_state_map and runtime_config_map
threshold_checker = ThresholdChecker() # A Component that returns "true" or "false"
if_branch = step(PositiveComponent(), {"input": "input"}, "output")
else_branch = step(NegativeComponent(), {"input": "input"}, "output")
if_else_step = if_else(
threshold_checker,
if_branch,
else_branch,
input_state_map={"value": "input"}, # Maps pipeline state to component input
output_state="condition_result",
runtime_config_map={"threshold": "threshold_config"}, # Maps runtime config to component input
fixed_args={"strict_mode": True} # Fixed arguments passed directly to component
)
This will cause the step to execute the ThresholdChecker component with the input
from the pipeline state
as its value
parameter and the threshold_config
from runtime configuration as its threshold
parameter.
Based on the component's result ("true" or "false"), it will execute either the PositiveComponent or
the NegativeComponent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition |
ConditionType | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
if_branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is true. |
required |
else_branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is false. |
required |
input_state_map |
dict[str, str] | None
|
Mapping of condition input arguments to pipeline state keys.
This is only used if the condition is a |
None
|
output_state |
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of condition input arguments to runtime
configuration keys. This is only used if the condition is a |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None, in which case an empty dictionary is used. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "IfElse" followed by the condition's function name and a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
log(message, is_template=True, emit_kwargs=None, name=None)
Create a specialized step for logging messages.
This function creates a LogStep that logs messages within a pipeline. It can be used to log status updates, debugging information, or any other text during pipeline execution.
The message can be a plain string or a template with placeholders for state variables.
Usage example 1 (plain message):
log_step = log("Processing document", is_template=False)
Usage example 2 (template message with state variables):
log_step = log("Processing query: {query} with model: {model_name}")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str
|
The message to be logged. May contain placeholders in curly braces for state variables. |
required |
is_template |
bool
|
Whether the message is a template with placeholders. Defaults to True. |
True
|
emit_kwargs |
dict[str, Any] | None
|
Additional keyword arguments to pass to the event emitter. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. If None, a name will be auto-generated with the prefix "log_". Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
LogStep |
LogStep
|
A specialized pipeline step for logging messages. |
map_reduce(input_state_map, output_state, map_func, reduce_func=lambda results: results, runtime_config_map=None, fixed_args=None, name=None)
Create a MapReduceStep that maps a function over multiple inputs and reduces the results.
This function creates a step that applies a mapping function to multiple inputs in parallel and combines the results using a reduction function.
The map_func
receives a dictionary for each item being processed. This dictionary contains:
1. Values from input_state_map
(with list inputs split into individual items).
2. Values from runtime_config_map
(if provided).
3. Values from fixed_args
(if provided).
The map_func
can be either:
- A callable function that takes a dictionary as input and returns a result.
- A Component
instance, which will be executed with proper async handling.
Important note on parallel execution:
1. For true parallelism, the map_func
MUST be an async function or a Component
.
2. Synchronous map functions will block the event loop and run sequentially.
The step supports automatic broadcasting of scalar values and handles lists appropriately: 1. If multiple list inputs are provided, they must be the same length. 2. Scalar inputs are broadcasted to match list lengths.
Usage Examples: Example 1: Processing a list of items with an async map function.
async def count_words(item):
await asyncio.sleep(0.1) # Simulate I/O operation
return len(item["document"].split())
process_docs = map_reduce(
input_state_map={
"document": "documents" # A list, e.g. ["doc1...", "doc2...", "doc3..."]
},
output_state="word_counts", # A list of word counts for each document
map_func=count_words,
reduce_func=lambda results: sum(results), # Sum word counts
)
# When executed with {"documents": ["doc1...", "doc2...", "doc3..."]},
# returns {"word_counts": 60} (total word count)
Example 2: Broadcasting scalar values to match list length
# Apply a common threshold to multiple values
threshold_check = map_reduce(
input_state_map={
"value": "values", # A list: [5, 10, 15]
"threshold": "threshold", # A scalar: 8 (will be broadcast)
},
output_state="above_threshold",
map_func=lambda item: item["value"] > item["threshold"],
reduce_func=lambda results: results # Return list of boolean results
)
# When executed with {"values": [5, 10, 15], "threshold": 8},
# returns {"above_threshold": [False, True, True]}
Example 3: Multiple list inputs with the same length
similarity_step = map_reduce(
input_state_map={
"doc1": "documents_a", # ["doc1", "doc2", "doc3"]
"doc2": "documents_b", # ["docA", "docB", "docC"]
},
output_state="similarity_scores",
map_func=lambda item: calculate_similarity(item["doc1"], item["doc2"]),
reduce_func=lambda results: sum(results) / len(results) # Average similarity
)
# When executed with {"documents_a": ["doc1", "doc2", "doc3"], "documents_b": ["docA", "docB", "docC"]},
# returns {"similarity_scores": 0.75}
Example 4: Using a Component for complex processing instead of a map function
summarizer = TextSummarizer() # Subclass of Component
summarize_step = map_reduce(
input_state_map={
"text": "documents", # List of documents to summarize
"max_length": "max_length", # Scalar parameter (broadcasted)
},
output_state="summaries",
map_func=summarizer,
reduce_func=lambda results: [r["summary"] for r in results]
)
# When executed with {"documents": ["doc1...", "doc2..."], "max_length": 50},
# returns {"summaries": ["summary1...", "summary2..."]}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_state_map |
dict[str, str]
|
Mapping of function arguments to pipeline state keys. |
required |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. The map function receives a dictionary containing the input values derived from input_state_map, runtime_config_map, and fixed_args. |
required |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce the mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
runtime_config_map |
dict[str, str] | None
|
Mapping of arguments to runtime config keys. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to pass to the functions. |
None
|
name |
str | None
|
A unique identifier for this step. Defaults to None, in which case the name will be "MapReduce" followed by the map function name. |
None
|
Returns:
Name | Type | Description |
---|---|---|
MapReduceStep |
MapReduceStep
|
An instance of MapReduceStep configured with the provided parameters. |
no_op(name=None)
Create a NoOpStep to add a step that does nothing.
This function creates a PassThroughStep that does nothing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "NoOp" followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
NoOpStep |
NoOpStep
|
An instance of NoOpStep. |
parallel(branches, input_states=None, squash=True, runtime_config_map=None, fixed_args=None, name=None)
Create a ParallelStep that executes multiple branches concurrently.
This function creates a ParallelStep that runs multiple branches in parallel and merges their results. Each branch can be a single step or a list of steps to execute sequentially.
The step supports two execution modes controlled by the squash
parameter:
1. Squashed (default): Uses asyncio.gather() to run branches in parallel within a single LangGraph node. Use for:
a. Better raw performance
b. Simpler implementation
c. Less overhead
d. Less transparent for debugging and tracing
2. Expanded (squash=False): Creates a native LangGraph structure with multiple parallel paths. Use for:
a. More native LangGraph integration
b. More transparent for debugging and tracing
For memory optimization, you can specify input_states to pass only specific keys to branches. This is especially useful when the state is large but branches only need specific parts of it. If input_states is None (default), all state keys will be passed.
Usage example:
parallel_step = parallel(
branches=[
step(ComponentA(), {"input": "query"}, "output_a"),
[
step(ComponentB1(), {"input": "query"}, "output_b1"),
step(ComponentB2(), {"input": "output_b1"}, "output_b2")
],
step(ComponentC(), {"input": "query"}, "output_c")
],
input_states=["query"], # Only 'query' will be passed to branches
)
Important note about input_states: 1. If input_states is provided, only the keys listed will be passed to the branches 2. If input_states is None (default), all state keys will be passed to the branches 3. Each branch is responsible for its own state management once execution begins 4. For sequential branches (lists of steps), later steps in the branch can still access outputs from earlier steps in the same branch
Parameters:
Name | Type | Description | Default |
---|---|---|---|
branches |
list[BasePipelineStep | list[BasePipelineStep]]
|
List of branches to execute in parallel. Each branch can be a single step or a list of steps to execute sequentially. |
required |
input_states |
list[str] | None
|
Keys from the state to pass to branches. If None, all state keys will be passed. Defaults to None. |
None
|
squash |
bool
|
Whether to squash execution into a single node. If True, uses asyncio.gather() to run branches in parallel. If False, uses native LangGraph structures for parallelism. Defaults to True. |
True
|
runtime_config_map |
dict[str, str] | None
|
Mapping of input keys to runtime config keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to include in the state passed to branches. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for this parallel step. Defaults to None. In this case, a name will be auto-generated. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ParallelStep |
ParallelStep
|
An instance of ParallelStep configured with the provided branches. |
step(component, input_state_map, output_state=None, runtime_config_map=None, fixed_args=None, emittable=True, name=None)
Create a ComponentStep with a concise syntax.
This function creates a ComponentStep, which wraps a component and manages its inputs and outputs within the pipeline.
Usage example 1:
retriever = Retriever()
retriever_step = step(retriever, {"query": "input_query"}, "retrieved_data")
This will cause the step to execute the Retriever component with the following behavior:
1. It will pass the input_query
from the pipeline state to the query
argument of the Retriever.
2. It will store the retrieved_data
from the Retriever result in the pipeline state.
Usage example 2:
retriever = Retriever()
custom_emitter = CustomEventEmitter()
retriever_step = step(
retriever, {"query": "input_query"}, "retrieved_data", {"top_k": "top_k"}, {"event_emitter": custom_emitter}
)
This will cause the step to execute the Retriever component with the following behavior:
1. It will pass the input_query
from the pipeline state to the query
argument of the Retriever.
2. It will pass the top_k
from the runtime configuration to the top_k
argument of the Retriever.
3. It will pass the custom_emitter
to the event_emitter
argument of the Retriever without modification.
4. It will store the retrieved_data
from the Retriever result in the pipeline state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
Component
|
The component to be executed in this step. |
required |
input_state_map |
dict[str, str]
|
Mapping of component input arguments to pipeline state keys. |
required |
output_state |
str | list[str] | None
|
Key(s) to extract from the component result and add to the pipeline state. If None, the component is executed but no state updates are performed. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of component arguments to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the component. Defaults to None, in which case an empty dictionary is used. |
None
|
emittable |
bool
|
Whether an event emitter should be passed to the component, if available in the state and not explicitly provided in any of the arguments. Defaults to True. |
True
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be the component's class name followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ComponentStep |
ComponentStep
|
An instance of ComponentStep configured with the provided parameters. |
subgraph(subgraph, input_state_map=None, output_state_map=None, runtime_config_map=None, fixed_args=None, name=None)
Create a SubgraphStep that executes another pipeline as a subgraph.
This function creates a SubgraphStep that allows for encapsulation and reuse of pipeline logic by treating another pipeline as a step. The subgraph can have its own state schema, and this step handles the mapping between the parent and subgraph states.
The SubgraphStep gracefully handles missing state keys - if a key specified in input_state_map is not present in the parent state, it will be omitted from the subgraph input rather than causing an error. This allows for flexible composition of pipelines with different state schemas.
Usage example:
from typing import TypedDict
from gllm_pipeline.pipeline.pipeline import Pipeline
# Define state schemas using TypedDict
class SubgraphState(TypedDict):
query: str
retrieved_data: list
reranked_data: list
class ParentState(TypedDict):
user_input: str
query: str
reranked: list
response: str
# Define a subgraph pipeline with its own state schema
subgraph_pipeline = Pipeline(
[
step(Retriever(), {"query": "query"}, "retrieved_data"),
step(Reranker(), {"data": "retrieved_data"}, "reranked_data")
],
state_type=SubgraphState
)
# Use the subgraph in a parent pipeline
parent_pipeline = Pipeline(
[
step(QueryProcessor(), {"input": "user_input"}, "query"),
subgraph(
subgraph_pipeline,
{"query": "query"}, # Map parent state to subgraph input
{"reranked": "reranked_data"}, # Map subgraph output to parent state
runtime_config_map={"top_k": "retrieval_top_k"}
),
step(ResponseGenerator(), {"data": "reranked"}, "response")
],
state_type=ParentState
)
# When the parent pipeline runs:
# 1. QueryProcessor processes user_input and produces query
# 2. SubgraphStep creates a new state for the subgraph with query from parent
# 3. Subgraph executes its steps (Retriever → Reranker)
# 4. SubgraphStep maps reranked_data from subgraph to reranked in parent
# 5. ResponseGenerator uses reranked to produce response
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subgraph |
Pipeline
|
The pipeline to be executed as a subgraph. |
required |
input_state_map |
dict[str, str] | None
|
Mapping of subgraph input keys to parent pipeline state keys. Keys that don't exist in the parent state will be gracefully ignored. If None, all subgraph inputs will be passed as-is. |
None
|
output_state_map |
dict[str, str] | None
|
Mapping of parent pipeline state keys to subgraph output keys. If None, all subgraph outputs will be passed as-is. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of subgraph input keys to runtime configuration keys. Defaults to None, in which case an empty dictionary is used. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the subgraph. Defaults to None, in which case an empty dictionary is used. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
SubgraphStep |
SubgraphStep
|
An instance of SubgraphStep configured with the provided parameters. |
switch(condition, branches, input_state_map=None, output_state=None, default=None, runtime_config_map=None, fixed_args=None, name=None)
Create a ConditionalStep with multiple branches.
This function creates a ConditionalStep that can execute one of multiple branches based on a condition.
Usage example with a Callable condition:
# Using a Callable condition - receives merged state and config directly
def extract_command(data):
# Access both state and config in a single dictionary
query = data["query"]
separator = data["separator"] # From runtime config or state
return query.split(separator)[0]
branches = {
"search": step(SearchComponent(), {"query": "query"}, "search_result"),
"filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
}
default = step(NoOpComponent(), {}, "no_op_result")
switch_step = switch(
extract_command,
branches,
# input_state_map and runtime_config_map are ignored for Callable conditions
# but can still be specified (they will have no effect)
output_state="command_type",
default=default,
fixed_args={"separator": " "} # This will be merged with state and config
)
This will cause the step to execute the SearchComponent if the first part of the query
in the pipeline state is
"search", the FilterComponent if it is "filter", and the NoOpComponent otherwise. The separator is provided as a
fixed argument. The result of the condition will be stored in the pipeline state under the key command_type
.
Usage example with a Component condition:
# Using a Component condition - requires input_state_map and runtime_config_map
command_extractor = CommandExtractor() # A Component that extracts command from query
branches = {
"search": step(SearchComponent(), {"query": "query"}, "search_result"),
"filter": step(FilterComponent(), {"query": "query"}, "filter_result"),
"sort": step(SortComponent(), {"query": "query"}, "sort_result"),
}
default = step(DefaultComponent(), {"query": "query"}, "default_result")
switch_step = switch(
command_extractor,
branches,
input_state_map={"text": "query"}, # Maps pipeline state to component input
output_state="command_type",
default=default,
runtime_config_map={"delimiter": "separator_config"}, # Maps runtime config to component input
fixed_args={"lowercase": True} # Fixed arguments passed directly to component
)
This will cause the step to execute the CommandExtractor component with the query
from the pipeline state
as its text
parameter and the separator_config
from runtime configuration as its delimiter
parameter.
Based on the component's result (which should be one of "search", "filter", "sort", or something else),
it will execute the corresponding branch component or the default component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition |
ConditionType
|
The condition to evaluate for branch selection. |
required |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps to execute. |
required |
input_state_map |
dict[str, str] | None
|
Mapping of condition input arguments to pipeline state keys.
This is only used if the condition is a |
None
|
output_state |
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
default |
BasePipelineStep | list[BasePipelineStep] | None
|
Default branch to execute if no condition matches. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of condition input arguments to runtime
configuration keys. This is only used if the condition is a |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Switch" followed by the condition's function name and a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
terminate(name=None)
Create a TerminatorStep to end pipeline execution.
This function creates a TerminatorStep that explicitly terminates a branch or the entire pipeline.
Usage example:
early_exit = terminate("early_exit")
pipeline = (
step_a
| if_else(should_stop, early_exit, step_b)
| step_c
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Terminator" followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TerminatorStep |
TerminatorStep
|
An instance of TerminatorStep. |
toggle(condition, if_branch, input_state_map=None, output_state=None, runtime_config_map=None, fixed_args=None, name=None)
Create a ConditionalStep that toggles between a branch and a no-op.
This function creates a ConditionalStep that executes a branch if the condition is true, and does nothing (no-op) if the condition is false.
The condition can be: 1. A Component that must return exactly "true" or "false" 2. A callable that returns a string ("true" or "false", case insensitive) 3. A callable that returns a boolean (will be converted to "true"/"false") 4. A string key that will be looked up in the merged state data (state + runtime config + fixed args). The value will be evaluated for truthiness - any non-empty, non-zero, non-False value will be considered True.
Usage example with a Callable condition:
# Using a Callable condition - receives merged state and config directly
condition = lambda data: data["feature_enabled"] and data["user_tier"] >= 2
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle(
condition,
feature_step,
output_state="feature_status",
fixed_args={"user_tier": 2} # This will be merged with state and config
)
This will execute the FeatureComponent only if both feature_enabled
is true and user_tier
is at least 2.
Otherwise, it will do nothing. The condition result will be stored in the pipeline state under the key
feature_status
.
Usage example with a Component condition:
# Using a Component condition - requires input_state_map and runtime_config_map
feature_checker = FeatureChecker() # A Component that returns "true" or "false"
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle(
feature_checker,
feature_step,
input_state_map={"user_id": "current_user"}, # Maps pipeline state to component input
output_state="feature_status",
runtime_config_map={"feature_name": "target_feature"}, # Maps runtime config to component input
fixed_args={"check_permissions": True} # Fixed arguments passed directly to component
)
This will cause the step to execute the FeatureChecker component with the current_user
from the pipeline state
as its user_id
parameter and the target_feature
from runtime configuration as its feature_name
parameter.
Based on the component's result ("true" or "false"), it will either execute the FeatureComponent or do nothing.
Usage example with string key:
# Using a string key - looks up the value in the merged state and config
feature_step = step(FeatureComponent(), {"input": "input"}, "output")
toggle_step = toggle("feature_enabled", feature_step, runtime_config_map={"feature_enabled": "enable_feature"})
This will execute the FeatureComponent only if the value at the key feature_enabled
in the pipeline state
(or enable_feature
in the runtime config) evaluates to True. Otherwise, it will do nothing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition |
ConditionType | Callable[[dict[str, Any]], bool] | str
|
The condition to evaluate. |
required |
if_branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is true. |
required |
input_state_map |
dict[str, str] | None
|
Mapping of condition input arguments to pipeline state keys. Defaults to None. |
None
|
output_state |
str | None
|
Key to store the condition result in the pipeline state. Defaults to None. |
None
|
runtime_config_map |
dict[str, str] | None
|
Mapping of condition input arguments to runtime configuration keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Toggle" followed by a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ConditionalStep |
ConditionalStep
|
An instance of ConditionalStep configured with the provided parameters. |
transform(operation, input_states, output_state, runtime_config_map=None, fixed_args=None, name=None)
Create a StateOperatorStep for transforming state data.
This function creates a StateOperatorStep that applies a transformation operation to the pipeline state.
Note that the function operation
should accept a dictionary of input data and return the operation result.
Usage example:
def sort(data: dict) -> dict:
is_reverse = data["reverse"]
data["chunk"] = sorted(data["chunk"], reverse=is_reverse)
transform_step = transform(sort, ["chunk"], "output", runtime_config={"reverse": True})
This will cause the step to execute the sort
operation on the chunk
in the pipeline state. The result will be
stored in the pipeline state under the key output
. The behavior is controlled by the runtime configuration key
reverse
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operation |
Callable[[dict[str, Any]], Any]
|
The operation to execute on the input data. |
required |
input_states |
list[str]
|
List of input state keys required by the operation. |
required |
output_state |
str | list[str]
|
Key(s) to store the operation result in the pipeline state. |
required |
runtime_config_map |
dict[str, str] | None
|
Mapping of operation input arguments to runtime configuration keys. Defaults to None. |
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the operation. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be "Transform" followed by the operation's function name and a unique identifier. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateOperatorStep |
StateOperatorStep
|
An instance of StateOperatorStep configured with the provided parameters. |