Pipeline
Modules concerning the pipeline orchestration of Gen AI applications.
Pipeline(steps, state_type=RAGState, recursion_limit=30, name=None, cache_store=None, cache_config=None)
Represents a sequence of steps executed in order, forming a pipeline.
A pipeline can have zero or more steps. When a pipeline has no steps (empty list), it acts as a pass-through pipeline that simply returns the input state unchanged. This is useful when using the pipe (|) operator to define RAG State without requiring explicit steps.
Attributes:
Name | Type | Description |
---|---|---|
steps |
list[BasePipelineStep]
|
List of steps to be executed in the pipeline. Can be empty for a pass-through pipeline. |
state_type |
type
|
The type of state used in the pipeline. Defaults to RAGState. |
recursion_limit |
int
|
The maximum number of steps allowed. |
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
Usage examples
Basic pipeline with steps
pipeline = Pipeline([retrieval_step, generation_step, terminator_step])
Empty pipeline (pass-through)
pipeline = Pipeline([])
pipeline = Pipeline(None)
Pipeline with custom state type
class CustomState(TypedDict):
user_query: str
context: str
response: str
pipeline = Pipeline([retrieval_step, generation_step], state_type=CustomState)
Named pipeline for subgraph usage
pipeline = Pipeline([retrieval_step, generation_step], name="rag_pipeline")
Pipeline with caching
pipeline = Pipeline(
[retrieval_step, generation_step],
cache_store=cache_store,
cache_config={"ttl": 3600, "name": "rag_cache"}
)
Using pipe (|) operator to combine steps
pipeline = retrieval_step | generation_step | terminator_step
Using pipe (|) operator to combine step with pipeline
pipeline = Pipeline([retrieval_step, generation_step]) | terminator_step
Using pipe (|) operator to combine pipelines
pipeline1 = Pipeline([retrieval_step])
pipeline2 = Pipeline([generation_step, terminator_step])
combined_pipeline = pipeline1 | pipeline2
Initializes the Pipeline with the given steps and state type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
steps |
list[BasePipelineStep] | None
|
List of steps to be executed in the pipeline.
Defaults to None, in which case the steps will be |
required |
state_type |
type
|
The type of state to be used. Defaults to RAGState. |
RAGState
|
recursion_limit |
int
|
The maximum number of steps allowed. Defaults to 30. |
30
|
name |
str | None
|
A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier. |
None
|
cache_store |
BaseCache | None
|
The cache store to use for caching pipeline results. Defaults to None. If None, no caching will be used. |
None
|
cache_config |
dict[str, Any] | None
|
Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration. |
None
|
graph: StateGraph
property
The graph representation of the pipeline.
If the graph doesn't exist yet, it will be built automatically.
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph representation of the pipeline. |
state_type: type
property
writable
The current state type of the pipeline.
Returns:
Name | Type | Description |
---|---|---|
type |
type
|
The current state type. |
__lshift__(other)
Includes another pipeline or step using the '<<' operator.
This allows for easy composition where: - If 'other' is a Pipeline: it becomes a subgraph within this pipeline - If 'other' is a BasePipelineStep: it's added directly to this pipeline's steps
The syntax pipeline1 << pipeline2
visually indicates pipeline2 being inserted into pipeline1.
The syntax pipeline << step
adds the step to the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include as a subgraph or step to add. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with the other pipeline included as a subgraph step or with the step added. |
__or__(other)
Combines the current pipeline with another pipeline or step using the '|' operator.
When combining two pipelines, the state types must match.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The other pipeline or step to combine with. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline consisting of the combined steps. |
Raises:
Type | Description |
---|---|
ValueError
|
If the state types of the pipelines do not match. |
__rshift__(other)
Includes this pipeline as a subgraph in another context using the '>>' operator.
This allows for easy composition where: - If 'other' is a Pipeline: this pipeline becomes a subgraph within the other pipeline - If 'other' is a BasePipelineStep: a new pipeline is created with the step, and this pipeline is included as a subgraph within that pipeline
The syntax pipeline1 >> pipeline2
embeds pipeline1 as a subgraph within pipeline2
(equivalent to pipeline2 << pipeline1).
The syntax pipeline >> step
creates a new pipeline with the step, and includes this pipeline
as a subgraph within that pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Pipeline | BasePipelineStep
|
The pipeline to include this pipeline in as a subgraph, or a step to create a new pipeline with. |
required |
Returns:
Name | Type | Description |
---|---|---|
Pipeline |
'Pipeline'
|
A new pipeline with this pipeline included as a subgraph. |
build_graph()
Builds the graph representation of the pipeline by connecting the steps.
get_mermaid_diagram()
Generate a Mermaid diagram representation of the pipeline.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The complete Mermaid diagram representation. |
invoke(initial_state, config=None)
async
Runs the pipeline asynchronously with the given initial state and configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
initial_state |
dict[str, Any]
|
The initial state to start the pipeline with. |
required |
config |
dict[str, Any]
|
Additional configuration for the pipeline. User-defined config should not have "langraph_" prefix as it should be reserved for internal use. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
dict[str, Any]: The final state after the pipeline execution. If 'debug_state' is set to True in the config, the state logs will be included in the final state with the key 'state_logs'. |
Raises:
Type | Description |
---|---|
BaseInvokerError
|
If an error occurs during LM invocation. |
CancelledError
|
If the execution is cancelled, preserved with added context. |
TimeoutError
|
If the execution times out, preserved with added context. |
RuntimeError
|
If an error occurs during pipeline execution. If the error is due to a step execution, the step name will be included in the error message. |
RAGState
Bases: TypedDict
A TypedDict representing the state of a Retrieval-Augmented Generation (RAG) pipeline.
This docstring documents the original intention of each of the attributes in the TypedDict. However, in practice, the attributes may be modified or extended to suit the specific requirements of the application. The TypedDict is used to enforce the structure of the state object.
Attributes:
Name | Type | Description |
---|---|---|
user_query |
str
|
The original query from the user. |
queries |
list[str]
|
A list of queries generated for retrieval. |
retrieval_params |
dict[str, Any]
|
Parameters used for the retrieval process. |
chunks |
list
|
A list of chunks retrieved from the knowledge base. |
history |
str
|
The history of the conversation or interaction. |
context |
str
|
The context information used for generating responses. |
response_synthesis_bundle |
dict[str, Any]
|
Data used for synthesizing the final response. |
response |
str
|
The generated response to the user's query. |
references |
str | list[str]
|
References or sources used in generating the response. |
event_emitter |
EventEmitter
|
An event emitter instance for logging purposes. |