Skip to content

Pipeline step

The base class for all pipeline steps.

Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id) Henry Wicaksono (henry.wicaksono@gdplabs.id) Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

[1] https://langchain-ai.github.io/langgraph/

BasePipelineStep(name)

Bases: ABC

The base class for all pipeline steps.

A pipeline step represents a single operation or task within a larger processing pipeline. Each step must implement: 1. execute() - to perform the actual operation 2. add_to_graph() - to integrate with the pipeline structure (optional, default implementation provided)

The default implementation of add_to_graph is suitable for steps that: 1. Have a single entry point 2. Have a single exit point 3. Connect to all previous endpoints

For more complex graph structures (e.g., conditional branching), steps should override add_to_graph.

Attributes:

Name Type Description
name str

A unique identifier for the pipeline step.

Initializes a new pipeline step.

Parameters:

Name Type Description Default
name str

A unique identifier for the pipeline step.

required

__lshift__(other)

Combines this step with another step or pipeline using the '<<' operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

The step or pipeline to add after this step.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this step followed by the other step or pipeline.

__or__(other)

Combines this step with another step or pipeline.

This method allows for easy composition of pipeline steps using the | operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

Another step or pipeline to combine with this one.

required

Returns:

Name Type Description
Pipeline Pipeline

A new pipeline containing the combined steps.

__rshift__(other)

Combines this step with another step or pipeline using the '>>' operator.

Parameters:

Name Type Description Default
other BasePipelineStep | Pipeline

The step or pipeline to include this step in.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this step included in the other step or pipeline.

add_to_graph(graph, previous_endpoints)

Integrates this step into the pipeline's internal structure.

This method is responsible for: 1. Adding the step's node(s) to the graph if not already present 2. Creating edges from previous endpoints to this step's entry points 3. Returning this step's exit points (endpoints)

This method provides a default implementation suitable for simple steps. Steps with more complex graph structures should override this method.

This method is used by Pipeline to manage the pipeline's execution flow. It should not be called directly by users.

Parameters:

Name Type Description Default
graph StateGraph

The internal representation of the pipeline structure.

required
previous_endpoints list[str]

The endpoints from previous steps to connect to.

required

Returns:

Type Description
list[str]

list[str]: The exit points (endpoints) of this step.

execute(state, config) abstractmethod async

Executes the operation defined for this pipeline step.

This method should be implemented by subclasses to perform the actual processing or computation for this step.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline, containing all data.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The update to the pipeline state after this step's operation. This should include new or modified data produced by this step, not the entire state. Returns None if no state update is needed.

Raises:

Type Description
NotImplementedError

If the subclass does not implement this method.

execute_direct(state, config) async

Execute this step directly, bypassing graph-based execution.

This method is used when a step needs to be executed directly, such as in parallel execution. The default implementation calls _execute_with_error_handling for consistent error handling.

Parameters:

Name Type Description Default
state dict[str, Any]

The current state of the pipeline.

required
config RunnableConfig

Runtime configuration for this step's execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updates to apply to the pipeline state, or None if no updates.

get_mermaid_diagram()

Generates a Mermaid diagram representation of the pipeline step.

This method provides a default implementation that can be overridden by subclasses to provide more detailed or specific diagrams.

It is recommended to implement this method for subclasses that have multiple connections to other steps.

Returns:

Name Type Description
str str

Empty string.