Pipeline
Modules concerning the pipeline orchestration of Gen AI applications.
Pipeline(steps=None, state_type=RAGState, input_type=None, output_type=None, context_schema=None, recursion_limit=30, name=None, cache_store=None, cache_config=None)
Represents a sequence of steps executed in order, forming a pipeline.
A pipeline can have zero or more steps. When a pipeline has no steps (empty list), it acts as a pass-through pipeline that simply returns the input state unchanged. This is useful when using the pipe (|) operator to define RAG State without requiring explicit steps.
Attributes:
| Name | Type | Description |
|---|---|---|
steps |
list[BasePipelineStep]
|
List of steps to be executed in the pipeline. Can be empty for a pass-through pipeline. |
state_type |
type
|
The type of state used in the pipeline. Defaults to RAGState. |
recursion_limit |
int
|
The maximum number of steps allowed. |
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
exclusions |
ExclusionManager
|
The exclusion manager for this pipeline. |
Usage examples
Basic pipeline with steps
pipeline = Pipeline([retrieval_step, generation_step, terminator_step])
Empty pipeline (pass-through)
pipeline = Pipeline([])
pipeline = Pipeline(None)
Pipeline with custom state type
class CustomState(TypedDict):
user_query: str
context: str
response: str
pipeline = Pipeline([retrieval_step, generation_step], state_type=CustomState)
Named pipeline for subgraph usage
pipeline = Pipeline([retrieval_step, generation_step], name="rag_pipeline")
Pipeline with caching
pipeline = Pipeline(
[retrieval_step, generation_step],
cache_store=cache_store,
cache_config={"ttl": 3600, "name": "rag_cache"}
)
Using pipe (|) operator to combine steps
pipeline = retrieval_step | generation_step | terminator_step
Using pipe (|) operator to combine step with pipeline
pipeline = Pipeline([retrieval_step, generation_step]) | terminator_step
Using pipe (|) operator to combine pipelines
pipeline1 = Pipeline([retrieval_step])
pipeline2 = Pipeline([generation_step, terminator_step])
combined_pipeline = pipeline1 | pipeline2
Configure step exclusion after initialization (set-only)
log_step = log(name="log_step", ...)
retrieval_step = step(name="retrieval_step", ...)
generation_step = step(name="generation_step", ...)
pipeline = Pipeline([log_step, retrieval_step, generation_step])
pipeline.exclusions.exclude("log_step") # Skip logging step
Configure composite step exclusion
log_step = log(name="log_step", ...)
retrieval_a_step = step(name="retrieval_a_step", ...)
retrieval_b_step = step(name="retrieval_b_step", ...)
parallel_step = parallel(
name="parallel_step", {"retrieval_a": retrieval_a_step, "retrieval_b": retrieval_b_step},
)
pipeline = Pipeline([log_step, parallel_step])
pipeline.exclusions.exclude("parallel_step") # Skip the entire parallel step
pipeline.exclusions.exclude("parallel_step.retrieval_a") # Skip retrieval_a step
pipeline.exclusions.exclude("parallel_step.retrieval_b") # Skip retrieval_b step
Initializes the Pipeline with the given steps and state type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps |
list[BasePipelineStep] | None
|
List of steps to be executed in the pipeline.
Defaults to None, in which case the steps will be |
None
|
state_type |
TypedDict | type[BaseModel]
|
The type of pipeline's overall internal state. Could be a TypedDict or a Pydantic BaseModel. Defaults to RAGState. |
RAGState
|
input_type |
TypedDict | type[BaseModel] | None
|
The type of pipeline's input state.
This state should be compatible with the pipeline's |
None
|
output_type |
TypedDict | type[BaseModel] | None
|
The type of pipeline's output state.
This state should be compatible with the pipeline's |
None
|
context_schema |
TypedDict | type[BaseModel] | None
|
The type of pipeline's runtime context. Defaults to None, in which case no context schema will be used. |
None
|
recursion_limit |
int
|
The maximum number of steps allowed. Defaults to 30. |
30
|
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
None
|
cache_store |
'BaseCache' | None
|
The cache store to use for caching pipeline results. Defaults to None. Defaults to None, in which case no caching will be used. |
None
|
cache_config |
dict[str, Any] | None
|
Configuration for the cache store. Defaults to None, in which case no cache configuration will be used. The cache config should be a dictionary with the following keys: 1. key_func (Callable | None, optional): A function to generate cache keys. Defaults to None, in which case the cache instance will use its own key function. 2. name (str | None, optional): The name of the cache. Defaults to None, in which case the cache instance will use its own key function. 3. ttl (int | None, optional): The time-to-live for the cache. Defaults to None, in which case the cache will not have a TTL. 4. matching_strategy (str | None, optional): The strategy for matching cache keys. Defaults to None, in which case the cache instance will use "exact". 5. matching_config (dict[str, Any] | None, optional): Configuration for the matching strategy. Defaults to None, in which case the cache instance will use its own default matching strategy configuration. |
None
|
composer: 'Composer'
property
Get a Composer instance that manages this pipeline.
The Composer provides a fluent API for building pipelines by chaining step-adding methods. It allows for easy composition of pipeline steps in a readable, chainable manner.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
'Composer'
|
A composer instance that manages this pipeline. |
exclusions: 'ExclusionManager'
property
Get the exclusion manager for this pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
ExclusionManager |
'ExclusionManager'
|
The exclusion manager for this pipeline. |
graph: StateGraph
property
The graph representation of the pipeline.
If the graph doesn't exist yet, it will be built automatically.
Returns:
| Name | Type | Description |
|---|---|---|
StateGraph |
StateGraph
|
The graph representation of the pipeline. |
state_type: type
property
writable
The current state type of the pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
type |
type
|
The current state type. |
__lshift__(other)
Includes another pipeline or step using the '<<' operator.
This allows for easy composition where: - If 'other' is a Pipeline: it becomes a subgraph within this pipeline - If 'other' is a BasePipelineStep: it's added directly to this pipeline's steps
The syntax pipeline1 << pipeline2 visually indicates pipeline2 being inserted into pipeline1.
The syntax pipeline << step adds the step to the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include as a subgraph or step to add. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with the other pipeline included as a subgraph step or with the step added. |
__or__(other)
Combines the current pipeline with another pipeline or step using the '|' operator.
When combining two pipelines, the state types must match.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The other pipeline or step to combine with. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline consisting of the combined steps. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the state types of the pipelines do not match. |
__rshift__(other)
Includes this pipeline as a subgraph in another context using the '>>' operator.
This allows for easy composition where: - If 'other' is a Pipeline: this pipeline becomes a subgraph within the other pipeline - If 'other' is a BasePipelineStep: a new pipeline is created with the step, and this pipeline is included as a subgraph within that pipeline
The syntax pipeline1 >> pipeline2 embeds pipeline1 as a subgraph within pipeline2
(equivalent to pipeline2 << pipeline1).
The syntax pipeline >> step creates a new pipeline with the step, and includes this pipeline
as a subgraph within that pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include this pipeline in as a subgraph, or a step to create a new pipeline with. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this pipeline included as a subgraph. |
build_graph()
Builds the graph representation of the pipeline by connecting the steps.
clear()
Clears the pipeline by resetting steps, graph, and app to their initial state.
This method resets the pipeline to an empty state, clearing all steps and invalidating any built graph or compiled app. Useful for reusing a pipeline instance with different configurations.
get_mermaid_diagram()
Generate a Mermaid diagram representation of the pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The complete Mermaid diagram representation. |
invoke(initial_state, config=None)
async
Runs the pipeline asynchronously with the given initial state and configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_state |
PipelineState
|
The initial state to start the pipeline with. This initial state should comply with the state type of the pipeline. |
required |
config |
dict[str, Any]
|
Additional configuration for the pipeline. User-defined config should not have "langraph_" prefix as it should be reserved for internal use. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The final state after the pipeline execution. If 'debug_state' is set to True in the config, the state logs will be included in the final state with the key 'state_logs'. |
Raises:
| Type | Description |
|---|---|
BaseInvokerError
|
If an error occurs during LM invocation. |
CancelledError
|
If the execution is cancelled, preserved with added context. |
TimeoutError
|
If the execution times out, preserved with added context. |
RuntimeError
|
If an error occurs during pipeline execution. If the error is due to a step execution, the step name will be included in the error message. |
RAGState
Bases: TypedDict
A TypedDict representing the state of a Retrieval-Augmented Generation (RAG) pipeline.
This docstring documents the original intention of each of the attributes in the TypedDict. However, in practice, the attributes may be modified or extended to suit the specific requirements of the application. The TypedDict is used to enforce the structure of the state object.
Attributes:
| Name | Type | Description |
|---|---|---|
user_query |
str
|
The original query from the user. |
queries |
list[str]
|
A list of queries generated for retrieval. |
retrieval_params |
dict[str, Any]
|
Parameters used for the retrieval process. |
chunks |
list
|
A list of chunks retrieved from the knowledge base. |
history |
list[Any]
|
The history of the conversation or interaction. |
context |
str
|
The context information used for generating responses. |
response_synthesis_bundle |
dict[str, Any]
|
Data used for synthesizing the final response. |
response |
str
|
The generated response to the user's query. |
references |
str | list[str]
|
References or sources used in generating the response. |
event_emitter |
EventEmitter
|
An event emitter instance for logging purposes. |
Example
state = {
"user_query": "What is machine learning?",
"queries": ["machine learning definition", "ML basics"],
"retrieval_params": {"top_k": 5, "threshold": 0.8},
"chunks": [
{"content": "Machine learning is...", "score": 0.95},
{"content": "ML algorithms include...", "score": 0.87}
],
"history": [
{"role": "user", "contents": ["What is machine learning?"]},
{"role": "assistant", "contents": ["Machine learning is a subset of artificial intelligence..."]}
],
"context": "Retrieved information about ML",
"response_synthesis_bundle": {"template": "informative"},
"response": "Machine learning is a subset of artificial intelligence...",
"references": ["source1.pdf", "article2.html"],
"event_emitter": EventEmitter()
}