Skip to content

Conditional step

A conditional pipeline step that executes a branch given a functional condition.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_state_map and runtime_config_map are used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. input_state_map and runtime_config_map are ignored in this case.

Author

Dimitrij Ray (dimitrij.ray@gdplabs.id)

References

NONE

ConditionInputs(merged, mapped, event_emitter) dataclass

Container for different types of inputs used in condition evaluation.

Attributes:

Name Type Description
merged dict[str, Any]

Complete merged dictionary containing all state, config, and fixed args. Used for Callable conditions.

mapped dict[str, Any]

Dictionary containing only explicitly mapped inputs. Used for Component conditions.

event_emitter EventEmitter | None

Event emitter instance for logging.

ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None)

Bases: BasePipelineStep, HasInputsMixin

A conditional pipeline step that conditionally executes different branches based on specified conditions.

This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.

A minimal usage requires defining the branches to execute based on a condition, which is a callable that takes input from the state and returns a string identifying the branch to execute.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_state_map and runtime_config_map are used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. input_state_map and runtime_config_map are ignored in this case.

Example:

ConditionalStep(
    name="UseCaseSelection",
    branches={"A": step_a, DEFAULT_BRANCH: step_b},
    condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)

This will execute step_a if the query contains "", and step_b otherwise.

The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches, the step will raise an error.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps.

condition list[ConditionType] | None

The condition(s) to evaluate for branch selection.

input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

output_state str | None

Key to store the condition result in the state, if desired.

condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results.

runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition evaluation.

Initializes a new ConditionalStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
condition ConditionType | list[ConditionType] | None

The condition(s) to evaluate for branch selection. If a Callable, it receives the merged state and config as keyword arguments. If None, the condition is evaluated from the state. Defaults to None.

None
input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None.

None
condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";").

lambda x: join(str(i) for i in x)
runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None

add_to_graph(graph, previous_endpoints)

Integrates this step into the pipeline's internal structure.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

This method always creates a condition node that: 1. Executes the condition and stores its result if output_state is defined 2. Acts as a pass-through router if no output_state is defined 3. Connects to the appropriate branch based on condition evaluation

Parameters:

Name Type Description Default
graph StateGraph

The internal representation of the pipeline structure.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: The exit points (endpoints) of all non-terminating branches.

execute(state, config) async

Executes the conditional step, determines the route, and returns a Command.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
Command Command

A LangGraph Command object with 'goto' for routing and 'update' for state changes.

execute_direct(state, config) async

Execute this step directly, handling both branch selection and execution.

This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

get_all_steps()

Gets all steps from all branches.

Returns:

Type Description
list[BasePipelineStep]

list[BasePipelineStep]: A list of all steps in all branches.

get_mermaid_diagram()

Create a Mermaid diagram representation of the conditional step.

Returns:

Name Type Description
str str

The complete Mermaid diagram representation of the conditional branches.

select_path(state, config) async

Determines the logical route key based on the evaluated condition(s).

This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Name Type Description
str str

The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key.