Pipeline step
The base class for all pipeline steps.
References
[1] https://langchain-ai.github.io/langgraph/
BasePipelineStep(name, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Bases: ABC
The base class for all pipeline steps.
A pipeline step represents a single operation or task within a larger processing pipeline. Each step must implement: 1. execute() - to perform the actual operation 2. add_to_graph() - to integrate with the pipeline structure (optional, default implementation provided)
The default implementation of add_to_graph is suitable for steps that: 1. Have a single entry point 2. Have a single exit point 3. Connect to all previous endpoints
For more complex graph structures (e.g., conditional branching), steps should override add_to_graph.
Examples:
-
Basic Usage:
python step = MyCustomStep("my_step") -
Adding Step Level Caching: ```python step = MyCustomStep( "my_step", cache_store=cache_store, cache_config={"ttl": 1800} )
-
Retry Configuration:
python retry_config = RetryConfig(max_retries=3, backoff_factor=2) step = MyCustomStep( "my_step", retry_config=retry_config ) -
Error Handling:
python step = MyCustomStep( "my_step", error_handler=error_handler )
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. |
cache_store |
'BaseCache' | None
|
The cache store used for caching step results, if configured. |
is_cache_enabled |
bool
|
Property indicating whether caching is enabled for this step. |
Initializes a new pipeline step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
required |
retry_config |
RetryConfig | None
|
Configuration for retry behavior using
GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.
The RetryConfig is automatically converted to LangGraph's RetryPolicy when needed for internal use.
Note that |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used. |
None
|
cache_store |
'BaseCache' | None
|
The cache store to use for caching step results. Defaults to None. If None, no caching will be used. |
None
|
cache_config |
dict[str, Any] | None
|
Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration. |
None
|
Caching Mechanism
When a cache_store is provided, the step's execution method is automatically wrapped with a cache decorator. This means: 1. Before execution, the cache is checked for existing results based on input parameters 2. If a cached result exists and is valid, it's returned immediately 3. If no cached result exists, the step executes normally and the result is cached 4. Cache keys are generated from the step's input state and configuration 5. The cache name defaults to "step_{step_name}" if not specified
is_cache_enabled: bool
property
Check if this step has caching enabled.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if caching is enabled, False otherwise. |
is_excluded: bool
property
writable
Whether this step is excluded from execution/graph integration.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the step is excluded, False otherwise. |
__lshift__(other)
Combines this step with another step or pipeline using the '<<' operator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
BasePipelineStep | Pipeline
|
The step or pipeline to add after this step. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this step followed by the other step or pipeline. |
__or__(other)
Combines this step with another step or pipeline.
This method allows for easy composition of pipeline steps using the | operator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
BasePipelineStep | Pipeline
|
Another step or pipeline to combine with this one. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
A new pipeline containing the combined steps. |
__rshift__(other)
Combines this step with another step or pipeline using the '>>' operator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
BasePipelineStep | Pipeline
|
The step or pipeline to include this step in. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this step included in the other step or pipeline. |
add_to_graph(graph, previous_endpoints, retry_policy=None)
Integrates this step into the pipeline's internal structure.
This method is responsible for: 1. Adding the step's node(s) to the graph if not already present 2. Creating edges from previous endpoints to this step's entry points 3. Returning this step's exit points (endpoints)
This method provides a default implementation suitable for simple steps. Steps with more complex graph structures should override this method.
This method is used by Pipeline to manage the pipeline's execution flow.
It should not be called directly by users.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph |
StateGraph
|
The internal representation of the pipeline structure. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
retry_policy |
RetryPolicy | None
|
Configuration for retry behavior using LangGraph's RetryPolicy. If None, the retry policy of the step is used. If the step is not a retryable step, this parameter is ignored. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: The exit points (endpoints) of this step. |
apply_exclusions(exclusions)
Apply exclusions to this step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exclusions |
ExclusionSet
|
The exclusion set to apply. |
required |
execute(state, runtime)
abstractmethod
async
Executes the operation defined for this pipeline step.
This method should be implemented by subclasses to perform the actual processing or computation for this step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
PipelineState
|
The current state of the pipeline, containing all data. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The update to the pipeline state after this step's operation. This should include new or modified data produced by this step, not the entire state. Returns None if no state update is needed. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the subclass does not implement this method. |
execute_direct(state, runtime)
async
Execute this step directly, bypassing graph-based execution.
This method is used when a step needs to be executed directly, such as in parallel execution. The default implementation calls _execute_with_error_handling for consistent error handling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for this step's execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
get_mermaid_diagram()
Generates a Mermaid diagram representation of the pipeline step.
This method provides a default implementation that can be overridden by subclasses to provide more detailed or specific diagrams.
It is recommended to implement this method for subclasses that have multiple connections to other steps.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Empty string. |