Conditional step
A conditional pipeline step that executes a branch given a functional condition.
The condition can be a Component or a Callable. The handling of inputs differs:
1. If the condition is a Component, input_state_map and runtime_config_map are
used to map the pipeline's state and config to the component's inputs.
2. If the condition is a Callable, it receives a merged dictionary of the
pipeline's state and config directly. input_state_map and runtime_config_map
are ignored in this case.
References
NONE
ConditionInputs(merged, mapped, event_emitter, has_mapped_specs)
dataclass
Container for different types of inputs used in condition evaluation.
Attributes:
| Name | Type | Description |
|---|---|---|
merged |
dict[str, Any]
|
Complete merged dictionary containing all state, config, and fixed args. Used for Callable conditions. |
mapped |
dict[str, Any]
|
Dictionary containing only explicitly mapped inputs. Used for Component conditions. |
event_emitter |
EventEmitter | None
|
Event emitter instance for logging. |
has_mapped_specs |
bool
|
Whether the mapped inputs have specs or is a literal value. |
ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Bases: BranchingStep, HasInputsMixin
A conditional pipeline step that conditionally executes different branches based on specified conditions.
This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.
A minimal usage requires defining the branches to execute based on a condition, which is a callable
that takes input from the state and returns a string identifying the branch to execute.
The condition can be a Component or a Callable. The handling of inputs differs:
1. If the condition is a Component, input_map is used to map the pipeline's
state and config to the component's inputs.
2. If the condition is a Callable, it receives a merged dictionary of the
pipeline's state and config directly. In this case, input_map is not used
to build the payload and should not be passed.
Example:
ConditionalStep(
name="UseCaseSelection",
branches={"A": step_a, DEFAULT_BRANCH: step_b},
condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)
This will execute step_a if the query contains "", and step_b otherwise.
The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute
if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches,
the step will raise an error.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps. |
condition |
list[ConditionType] | None
|
The condition(s) to evaluate for branch selection. |
input_map |
dict[str, str | Val] | None
|
Unified input map. |
output_state |
str | None
|
Key to store the condition result in the state, if desired. |
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. |
Initializes a new ConditionalStep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
A unique identifier for this pipeline step. |
required |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]]
|
Mapping of condition results to steps to execute. |
required |
condition |
ConditionType | list[ConditionType] | None
|
The condition(s) to evaluate for branch
selection. If a |
None
|
input_state_map |
dict[str, str] | None
|
A dictionary mapping the state keys to the component's
input keys. This is only used if the condition is a |
None
|
output_state |
str | None
|
Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None. |
None
|
condition_aggregator |
Callable[[list[Any]], str]
|
Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";"). |
lambda x: join(str(i) for i in x)
|
runtime_config_map |
dict[str, str] | None
|
A dictionary mapping the runtime config keys to the
component's input keys. This is only used if the condition is a
|
None
|
fixed_args |
dict[str, Any] | None
|
Fixed arguments to be passed to the condition. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache_store |
'BaseCache' | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
execute(state, runtime)
async
Executes the conditional step, determines the route, and returns a Command.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
PipelineState
|
The current state of the pipeline. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Command |
Command
|
A LangGraph Command object with 'goto' for routing and 'update' for state changes. |
execute_direct(state, runtime)
async
Execute this step directly, handling both branch selection and execution.
This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
select_path(state, runtime)
async
Determines the logical route key based on the evaluated condition(s).
This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key. |