Conditional step
A conditional pipeline step that executes a branch given a functional condition.
The condition can be a Component
or a Callable
. The handling of inputs differs:
1. If the condition is a Component
, input_state_map
and runtime_config_map
are
used to map the pipeline's state and config to the component's inputs.
2. If the condition is a Callable
, it receives a merged dictionary of the
pipeline's state and config directly. input_state_map
and runtime_config_map
are ignored in this case.
References
NONE
ConditionInputs(merged, mapped, event_emitter)
dataclass
Container for different types of inputs used in condition evaluation.
Attributes:
Name | Type | Description |
---|---|---|
merged |
dict[str, Any]
|
Complete merged dictionary containing all state, config, and fixed args. Used for Callable conditions. |
mapped |
dict[str, Any]
|
Dictionary containing only explicitly mapped inputs. Used for Component conditions. |
event_emitter |
EventEmitter | None
|
Event emitter instance for logging. |
ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None)
Bases: BasePipelineStep
, HasInputsMixin
A conditional pipeline step that conditionally executes different branches based on specified conditions.
This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.
A minimal usage requires defining the branches to execute based on a condition
, which is a callable
that takes input from the state and returns a string identifying the branch to execute.
The condition can be a Component
or a Callable
. The handling of inputs differs:
1. If the condition is a Component
, input_state_map
and runtime_config_map
are
used to map the pipeline's state and config to the component's inputs.
2. If the condition is a Callable
, it receives a merged dictionary of the
pipeline's state and config directly. input_state_map
and runtime_config_map
are ignored in this case.
Example:
ConditionalStep(
name="UseCaseSelection",
branches={"A": step_a, DEFAULT_BRANCH: step_b},
condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)
This will execute step_a
if the query contains "", and step_b
otherwise.
The special key __default__
(importable as DEFAULT_BRANCH) defines the default branch to execute
if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches,
the step will raise an error.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps. |
condition |
list[ConditionType] | None
|
The condition(s) to evaluate for branch selection. |
input_state_map |
dict[str, str] | None
|
A dictionary mapping the state keys to the component's
input keys. This is only used if the condition is a |
output_state |
str | None
|
Key to store the condition result in the state, if desired. |
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. |
runtime_config_map |
dict[str, str] | None
|
A dictionary mapping the runtime config keys to the
component's input keys. This is only used if the condition is a |
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition evaluation. |
Initializes a new ConditionalStep.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps to execute. |
required |
condition |
ConditionType | list[ConditionType] | None
|
The condition(s) to evaluate for branch
selection. If a |
None
|
input_state_map |
dict[str, str] | None
|
A dictionary mapping the state keys to the component's
input keys. This is only used if the condition is a |
None
|
output_state |
str | None
|
Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None. |
None
|
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";"). |
lambda x: join(str(i) for i in x)
|
runtime_config_map |
dict[str, str] | None
|
A dictionary mapping the runtime config keys to the
component's input keys. This is only used if the condition is a
|
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None. |
None
|
add_to_graph(graph, previous_endpoints)
Integrates this step into the pipeline's internal structure.
This method is used by Pipeline
to manage the pipeline's execution flow.
It should not be called directly by users.
This method always creates a condition node that: 1. Executes the condition and stores its result if output_state is defined 2. Acts as a pass-through router if no output_state is defined 3. Connects to the appropriate branch based on condition evaluation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
StateGraph
|
The internal representation of the pipeline structure. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
Returns:
Type | Description |
---|---|
list[str]
|
list[str]: The exit points (endpoints) of all non-terminating branches. |
execute(state, config)
async
Executes the conditional step, determines the route, and returns a Command.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
Command |
Command
|
A LangGraph Command object with 'goto' for routing and 'update' for state changes. |
execute_direct(state, config)
async
Execute this step directly, handling both branch selection and execution.
This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
get_all_steps()
Gets all steps from all branches.
Returns:
Type | Description |
---|---|
list[BasePipelineStep]
|
list[BasePipelineStep]: A list of all steps in all branches. |
get_mermaid_diagram()
Create a Mermaid diagram representation of the conditional step.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The complete Mermaid diagram representation of the conditional branches. |
select_path(state, config)
async
Determines the logical route key based on the evaluated condition(s).
This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key. |