Skip to content

Conditional step

A conditional pipeline step that executes a branch given a functional condition.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_state_map and runtime_config_map are used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. input_state_map and runtime_config_map are ignored in this case.

Author

Dimitrij Ray (dimitrij.ray@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

ConditionInputs(merged, mapped, event_emitter, has_mapped_specs) dataclass

Container for different types of inputs used in condition evaluation.

Attributes:

Name Type Description
merged dict[str, Any]

Complete merged dictionary containing all state, config, and fixed args. Used for Callable conditions.

mapped dict[str, Any]

Dictionary containing only explicitly mapped inputs. Used for Component conditions.

event_emitter EventEmitter | None

Event emitter instance for logging.

has_mapped_specs bool

Whether the mapped inputs have specs or is a literal value.

ConditionalStep(name, branches, condition=None, input_state_map=None, output_state=None, condition_aggregator=lambda x: ';'.join(str(i) for i in x), runtime_config_map=None, fixed_args=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: BranchingStep, HasInputsMixin

A conditional pipeline step that conditionally executes different branches based on specified conditions.

This step evaluates one or more conditions and selects a branch to execute based on the result. It provides flexibility in defining complex conditional logic within a pipeline.

A minimal usage requires defining the branches to execute based on a condition, which is a callable that takes input from the state and returns a string identifying the branch to execute.

The condition can be a Component or a Callable. The handling of inputs differs: 1. If the condition is a Component, input_map is used to map the pipeline's state and config to the component's inputs. 2. If the condition is a Callable, it receives a merged dictionary of the pipeline's state and config directly. In this case, input_map is not used to build the payload and should not be passed.

Example:

ConditionalStep(
    name="UseCaseSelection",
    branches={"A": step_a, DEFAULT_BRANCH: step_b},
    condition=lambda x: "A" if "<A>" in x["query"] else "__default__"
)

This will execute step_a if the query contains "", and step_b otherwise.

The special key __default__ (importable as DEFAULT_BRANCH) defines the default branch to execute if no other condition matches. If the DEFAULT_BRANCH is not defined and no condition matches, the step will raise an error.

Attributes:

Name Type Description
name str

A unique identifier for this pipeline step.

branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps.

condition list[ConditionType] | None

The condition(s) to evaluate for branch selection.

input_map dict[str, str | Val] | None

Unified input map.

output_state str | None

Key to store the condition result in the state, if desired.

condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution.

Initializes a new ConditionalStep.

Parameters:

Name Type Description Default
name str

A unique identifier for this pipeline step.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]]

Mapping of condition results to steps to execute.

required
condition ConditionType | list[ConditionType] | None

The condition(s) to evaluate for branch selection. If a Callable, it receives the merged state and config as keyword arguments. If None, the condition is evaluated from the state. Defaults to None.

None
input_state_map dict[str, str] | None

A dictionary mapping the state keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
output_state str | None

Key to store the condition result in the state. If None, the output is not saved in the state. Defaults to None.

None
condition_aggregator Callable[[list[Any]], str]

Function to aggregate multiple condition results. Defaults to joining results with a semicolon (";").

lambda x: join(str(i) for i in x)
runtime_config_map dict[str, str] | None

A dictionary mapping the runtime config keys to the component's input keys. This is only used if the condition is a Component. Defaults to None.

None
fixed_args dict[str, Any] | None

Fixed arguments to be passed to the condition. Defaults to None.

None
input_map InputMapSpec | None

Unified input map. Can be a dict (arg -> str|Val) or a list with elements: 1. str for identity mapping 2. dict[str, str] for state/config mapping 3. dict[str, Val] for fixed args. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

execute(state, runtime) async

Executes the conditional step, determines the route, and returns a Command.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
Command Command

A LangGraph Command object with 'goto' for routing and 'update' for state changes.

execute_direct(state, runtime) async

Execute this step directly, handling both branch selection and execution.

This method is used when the step needs to be executed directly (e.g. in parallel execution). It will both select and execute the appropriate branch, unlike execute() which only handles selection.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

select_path(state, runtime) async

Determines the logical route key based on the evaluated condition(s).

This method prepares input data, evaluates conditions, aggregates results, and determines the logical route key.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Name Type Description
str str

The identifier of the selected logical route. Returns DEFAULT_BRANCH if an error occurs or if the condition result doesn't match any branch key.