Pipeline
Defines the pipeline, which is a sequence of steps that are executed in order.
The Pipeline class is the main class for defining and executing pipelines. It provides a way to compose multiple steps into a sequence, and to execute them in a structured manner.
The Pipeline and the pipeline steps wrap the LangGraph [1] library, which provides a powerful way to define and execute complex workflows.
The Pipeline and its steps are composable using the |
, <<
, and >>
operators.
The |
operator is used to combine pipelines or steps.
1. step1 | step2
will create a pipeline with step1 and step2 in sequence.
2. pipeline1 | pipeline2
will create a pipeline with steps from pipeline1 followed by steps from pipeline2.
3. pipeline | step
will add the step to the end of the pipeline.
4. step | pipeline
will create a new pipeline with the step added to the beginning of the pipeline.
The <<
and >>
operators are used to include pipelines or steps as subgraphs.
1. pipeline1 << pipeline2
will embed pipeline2 as a subgraph within pipeline1.
2. pipeline1 >> pipeline2
will embed pipeline1 as a subgraph within pipeline2.
3. step << pipeline
and pipeline >> step
will create a new pipeline which executes step
, then pipeline
as a
subgraph.
4. pipeline << step
and step >> pipeline
are equivalent to pipeline | step
.
5. step1 << step2
and step1 >> step2
are equivalent to step1 | step2
.
References
[1] https://langchain-ai.github.io/langgraph/
Pipeline(steps, state_type=RAGState, recursion_limit=30, name=None, cache_store=None, cache_config=None)
Represents a sequence of steps executed in order, forming a pipeline.
A pipeline can have zero or more steps. When a pipeline has no steps (empty list), it acts as a pass-through pipeline that simply returns the input state unchanged. This is useful when using the pipe (|) operator to define RAG State without requiring explicit steps.
Attributes:
Name | Type | Description |
---|---|---|
steps |
list[BasePipelineStep]
|
List of steps to be executed in the pipeline. Can be empty for a pass-through pipeline. |
state_type |
type
|
The type of state used in the pipeline. Defaults to RAGState. |
recursion_limit |
int
|
The maximum number of steps allowed. |
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
Usage examples
Basic pipeline with steps
pipeline = Pipeline([retrieval_step, generation_step, terminator_step])
Empty pipeline (pass-through)
pipeline = Pipeline([])
pipeline = Pipeline(None)
Pipeline with custom state type
class CustomState(TypedDict):
user_query: str
context: str
response: str
pipeline = Pipeline([retrieval_step, generation_step], state_type=CustomState)
Named pipeline for subgraph usage
pipeline = Pipeline([retrieval_step, generation_step], name="rag_pipeline")
Pipeline with caching
pipeline = Pipeline(
[retrieval_step, generation_step],
cache_store=cache_store,
cache_config={"ttl": 3600, "name": "rag_cache"}
)
Using pipe (|) operator to combine steps
pipeline = retrieval_step | generation_step | terminator_step
Using pipe (|) operator to combine step with pipeline
pipeline = Pipeline([retrieval_step, generation_step]) | terminator_step
Using pipe (|) operator to combine pipelines
pipeline1 = Pipeline([retrieval_step])
pipeline2 = Pipeline([generation_step, terminator_step])
combined_pipeline = pipeline1 | pipeline2
Initializes the Pipeline with the given steps and state type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
steps |
list[BasePipelineStep] | None
|
List of steps to be executed in the pipeline.
Defaults to None, in which case the steps will be |
required |
state_type |
type
|
The type of state to be used. Defaults to RAGState. |
RAGState
|
recursion_limit |
int
|
The maximum number of steps allowed. Defaults to 30. |
30
|
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
None
|
cache_store |
BaseCache | None
|
The cache store to use for caching pipeline results. Defaults to None. If None, no caching will be used. |
None
|
cache_config |
dict[str, Any] | None
|
Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration. |
None
|
graph: StateGraph
property
The graph representation of the pipeline.
If the graph doesn't exist yet, it will be built automatically.
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph representation of the pipeline. |
state_type: type
property
writable
The current state type of the pipeline.
Returns:
Name | Type | Description |
---|---|---|
type |
type
|
The current state type. |
__lshift__(other)
Includes another pipeline or step using the '<<' operator.
This allows for easy composition where: - If 'other' is a Pipeline: it becomes a subgraph within this pipeline - If 'other' is a BasePipelineStep: it's added directly to this pipeline's steps
The syntax pipeline1 << pipeline2
visually indicates pipeline2 being inserted into pipeline1.
The syntax pipeline << step
adds the step to the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include as a subgraph or step to add. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with the other pipeline included as a subgraph step or with the step added. |
__or__(other)
Combines the current pipeline with another pipeline or step using the '|' operator.
When combining two pipelines, the state types must match.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The other pipeline or step to combine with. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline consisting of the combined steps. |
Raises:
Type | Description |
---|---|
ValueError
|
If the state types of the pipelines do not match. |
__rshift__(other)
Includes this pipeline as a subgraph in another context using the '>>' operator.
This allows for easy composition where: - If 'other' is a Pipeline: this pipeline becomes a subgraph within the other pipeline - If 'other' is a BasePipelineStep: a new pipeline is created with the step, and this pipeline is included as a subgraph within that pipeline
The syntax pipeline1 >> pipeline2
embeds pipeline1 as a subgraph within pipeline2
(equivalent to pipeline2 << pipeline1).
The syntax pipeline >> step
creates a new pipeline with the step, and includes this pipeline
as a subgraph within that pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include this pipeline in as a subgraph, or a step to create a new pipeline with. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this pipeline included as a subgraph. |
build_graph()
Builds the graph representation of the pipeline by connecting the steps.
get_mermaid_diagram()
Generate a Mermaid diagram representation of the pipeline.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The complete Mermaid diagram representation. |
invoke(initial_state, config=None)
async
Runs the pipeline asynchronously with the given initial state and configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
initial_state |
dict[str, Any]
|
The initial state to start the pipeline with. |
required |
config |
dict[str, Any]
|
Additional configuration for the pipeline. User-defined config should not have "langraph_" prefix as it should be reserved for internal use. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The final state after the pipeline execution. If 'debug_state' is set to True in the config, the state logs will be included in the final state with the key 'state_logs'. |
Raises:
Type | Description |
---|---|
BaseInvokerError
|
If an error occurs during LM invocation. |
CancelledError
|
If the execution is cancelled, preserved with added context. |
TimeoutError
|
If the execution times out, preserved with added context. |
RuntimeError
|
If an error occurs during pipeline execution. If the error is due to a step execution, the step name will be included in the error message. |