Skip to content

Pipeline step

The base class for all pipeline steps.

Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id) Henry Wicaksono (henry.wicaksono@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

[1] https://langchain-ai.github.io/langgraph/

BasePipelineStep(name, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Bases: ABC

The base class for all pipeline steps.

A pipeline step represents a single operation or task within a larger processing pipeline. Each step must implement: 1. execute() - to perform the actual operation 2. add_to_graph() - to integrate with the pipeline structure (optional, default implementation provided)

The default implementation of add_to_graph is suitable for steps that: 1. Have a single entry point 2. Have a single exit point 3. Connect to all previous endpoints

For more complex graph structures (e.g., conditional branching), steps should override add_to_graph.

Examples:

  1. Basic Usage: python step = MyCustomStep("my_step")

  2. Adding Step Level Caching: ```python step = MyCustomStep( "my_step", cache_store=cache_store, cache_config={"ttl": 1800} )

  3. Retry Configuration: python retry_config = RetryConfig(max_retries=3, backoff_factor=2) step = MyCustomStep( "my_step", retry_config=retry_config )

  4. Error Handling: python step = MyCustomStep( "my_step", error_handler=error_handler )

Attributes:

Name Type Description
name str

A unique identifier for the pipeline step.

retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy.

cache_store 'BaseCache' | None

The cache store used for caching step results, if configured.

is_cache_enabled bool

Property indicating whether caching is enabled for this step.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. The RetryConfig is automatically converted to LangGraph's RetryPolicy when needed for internal use. Note that timeout is not supported and will be ignored.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the RaiseStepErrorHandler is used.

None
cache_store 'BaseCache' | None

The cache store to use for caching step results. Defaults to None. If None, no caching will be used.

None
cache_config dict[str, Any] | None

Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration.

None
Caching Mechanism

When a cache_store is provided, the step's execution method is automatically wrapped with a cache decorator. This means: 1. Before execution, the cache is checked for existing results based on input parameters 2. If a cached result exists and is valid, it's returned immediately 3. If no cached result exists, the step executes normally and the result is cached 4. Cache keys are generated from the step's input state and configuration 5. The cache name defaults to "step_{step_name}" if not specified

is_cache_enabled: bool property

Check if this step has caching enabled.

Returns:

Name Type Description
bool bool

True if caching is enabled, False otherwise.

is_excluded: bool property writable

Whether this step is excluded from execution/graph integration.

Returns:

Name Type Description
bool bool

True if the step is excluded, False otherwise.

__lshift__(other)

Combines this step with another step or pipeline using the '<<' operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

The step or pipeline to add after this step.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this step followed by the other step or pipeline.

__or__(other)

Combines this step with another step or pipeline.

This method allows for easy composition of pipeline steps using the | operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

Another step or pipeline to combine with this one.

required

Returns:

Name Type Description
Pipeline Pipeline

A new pipeline containing the combined steps.

__rshift__(other)

Combines this step with another step or pipeline using the '>>' operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

The step or pipeline to include this step in.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this step included in the other step or pipeline.

add_to_graph(graph, previous_endpoints, retry_policy=None)

Integrates this step into the pipeline's internal structure.

This method is responsible for: 1. Adding the step's node(s) to the graph if not already present 2. Creating edges from previous endpoints to this step's entry points 3. Returning this step's exit points (endpoints)

This method provides a default implementation suitable for simple steps. Steps with more complex graph structures should override this method.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

Parameters:

Name Type Description Default
graph StateGraph

The internal representation of the pipeline structure.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required
retry_policy RetryPolicy | None

Configuration for retry behavior using LangGraph's RetryPolicy. If None, the retry policy of the step is used. If the step is not a retryable step, this parameter is ignored.

None

Returns:

Type Description
list[str]

list[str]: The exit points (endpoints) of this step.

apply_exclusions(exclusions)

Apply exclusions to this step.

Parameters:

Name Type Description Default
exclusions ExclusionSet

The exclusion set to apply.

required

execute(state, runtime) abstractmethod async

Executes the operation defined for this pipeline step.

This method should be implemented by subclasses to perform the actual processing or computation for this step.

Parameters:

Name Type Description Default
state PipelineState

The current state of the pipeline, containing all data.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The update to the pipeline state after this step's operation. This should include new or modified data produced by this step, not the entire state. Returns None if no state update is needed.

Raises:

Type Description
NotImplementedError

If the subclass does not implement this method.

execute_direct(state, runtime) async

Execute this step directly, bypassing graph-based execution.

This method is used when a step needs to be executed directly, such as in parallel execution. The default implementation calls _execute_with_error_handling for consistent error handling.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

get_mermaid_diagram()

Generates a Mermaid diagram representation of the pipeline step.

This method provides a default implementation that can be overridden by subclasses to provide more detailed or specific diagrams.

It is recommended to implement this method for subclasses that have multiple connections to other steps.

Returns:

Name Type Description
str str

Empty string.