Pipeline step
The base class for all pipeline steps.
References
[1] https://langchain-ai.github.io/langgraph/
BasePipelineStep(name)
Bases: ABC
The base class for all pipeline steps.
A pipeline step represents a single operation or task within a larger processing pipeline. Each step must implement: 1. execute() - to perform the actual operation 2. add_to_graph() - to integrate with the pipeline structure (optional, default implementation provided)
The default implementation of add_to_graph is suitable for steps that: 1. Have a single entry point 2. Have a single exit point 3. Connect to all previous endpoints
For more complex graph structures (e.g., conditional branching), steps should override add_to_graph.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
Initializes a new pipeline step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A unique identifier for the pipeline step. |
required |
__lshift__(other)
Combines this step with another step or pipeline using the '<<' operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
BasePipelineStep | Pipeline
|
The step or pipeline to add after this step. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this step followed by the other step or pipeline. |
__or__(other)
Combines this step with another step or pipeline.
This method allows for easy composition of pipeline steps using the | operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
BasePipelineStep | Pipeline
|
Another step or pipeline to combine with this one. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
Pipeline
|
A new pipeline containing the combined steps. |
__rshift__(other)
Combines this step with another step or pipeline using the '>>' operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
BasePipelineStep | Pipeline
|
The step or pipeline to include this step in. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this step included in the other step or pipeline. |
add_to_graph(graph, previous_endpoints)
Integrates this step into the pipeline's internal structure.
This method is responsible for: 1. Adding the step's node(s) to the graph if not already present 2. Creating edges from previous endpoints to this step's entry points 3. Returning this step's exit points (endpoints)
This method provides a default implementation suitable for simple steps. Steps with more complex graph structures should override this method.
This method is used by Pipeline
to manage the pipeline's execution flow.
It should not be called directly by users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
StateGraph
|
The internal representation of the pipeline structure. |
required |
previous_endpoints |
list[str]
|
The endpoints from previous steps to connect to. |
required |
Returns:
Type | Description |
---|---|
list[str]
|
list[str]: The exit points (endpoints) of this step. |
execute(state, config)
abstractmethod
async
Executes the operation defined for this pipeline step.
This method should be implemented by subclasses to perform the actual processing or computation for this step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline, containing all data. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The update to the pipeline state after this step's operation. This should include new or modified data produced by this step, not the entire state. Returns None if no state update is needed. |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If the subclass does not implement this method. |
execute_direct(state, config)
async
Execute this step directly, bypassing graph-based execution.
This method is used when a step needs to be executed directly, such as in parallel execution. The default implementation calls _execute_with_error_handling for consistent error handling.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
dict[str, Any]
|
The current state of the pipeline. |
required |
config |
RunnableConfig
|
Runtime configuration for this step's execution. |
required |
Returns:
Type | Description |
---|---|
dict[str, Any] | None
|
dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates. |
get_mermaid_diagram()
Generates a Mermaid diagram representation of the pipeline step.
This method provides a default implementation that can be overridden by subclasses to provide more detailed or specific diagrams.
It is recommended to implement this method for subclasses that have multiple connections to other steps.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Empty string. |