Skip to content

Pipeline

Modules concerning the pipeline orchestration of Gen AI applications.

Pipeline(steps, state_type=RAGState, recursion_limit=30, name=None, cache_store=None, cache_config=None)

Represents a sequence of steps executed in order, forming a pipeline.

A pipeline can have zero or more steps. When a pipeline has no steps (empty list), it acts as a pass-through pipeline that simply returns the input state unchanged. This is useful when using the pipe (|) operator to define RAG State without requiring explicit steps.

Attributes:

Name Type Description
steps list[BasePipelineStep]

List of steps to be executed in the pipeline. Can be empty for a pass-through pipeline.

state_type type

The type of state used in the pipeline. Defaults to RAGState.

recursion_limit int

The maximum number of steps allowed.

name str | None

A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

Usage examples

Basic pipeline with steps

pipeline = Pipeline([retrieval_step, generation_step, terminator_step])

Empty pipeline (pass-through)

pipeline = Pipeline([])
pipeline = Pipeline(None)

Pipeline with custom state type

class CustomState(TypedDict):
    user_query: str
    context: str
    response: str

pipeline = Pipeline([retrieval_step, generation_step], state_type=CustomState)

Named pipeline for subgraph usage

pipeline = Pipeline([retrieval_step, generation_step], name="rag_pipeline")

Pipeline with caching

pipeline = Pipeline(
    [retrieval_step, generation_step],
    cache_store=cache_store,
    cache_config={"ttl": 3600, "name": "rag_cache"}
)

Using pipe (|) operator to combine steps

pipeline = retrieval_step | generation_step | terminator_step

Using pipe (|) operator to combine step with pipeline

pipeline = Pipeline([retrieval_step, generation_step]) | terminator_step

Using pipe (|) operator to combine pipelines

pipeline1 = Pipeline([retrieval_step])
pipeline2 = Pipeline([generation_step, terminator_step])
combined_pipeline = pipeline1 | pipeline2

Initializes the Pipeline with the given steps and state type.

Parameters:

Name Type Description Default
steps list[BasePipelineStep] | None

List of steps to be executed in the pipeline. Defaults to None, in which case the steps will be [] and simply returns the input state unchanged.

required
state_type type

The type of state to be used. Defaults to RAGState.

RAGState
recursion_limit int

The maximum number of steps allowed. Defaults to 30.

30
name str | None

A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

None
cache_store BaseCache | None

The cache store to use for caching pipeline results. Defaults to None. If None, no caching will be used.

None
cache_config dict[str, Any] | None

Configuration for the cache store. 1. key_func: A function to generate cache keys. If None, the cache instance will use its own key function. 2. name: The name of the cache. If None, the cache instance will use its own key function. 3. ttl: The time-to-live for the cache. If None, the cache will not have a TTL. 4. matching_strategy: The strategy for matching cache keys. If None, the cache instance will use "exact". 5. matching_config: Configuration for the matching strategy. If None, the cache instance will use its own default matching strategy configuration.

None

graph: StateGraph property

The graph representation of the pipeline.

If the graph doesn't exist yet, it will be built automatically.

Returns:

Name Type Description
StateGraph StateGraph

The graph representation of the pipeline.

state_type: type property writable

The current state type of the pipeline.

Returns:

Name Type Description
type type

The current state type.

__lshift__(other)

Includes another pipeline or step using the '<<' operator.

This allows for easy composition where: - If 'other' is a Pipeline: it becomes a subgraph within this pipeline - If 'other' is a BasePipelineStep: it's added directly to this pipeline's steps

The syntax pipeline1 << pipeline2 visually indicates pipeline2 being inserted into pipeline1. The syntax pipeline << step adds the step to the pipeline.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The pipeline to include as a subgraph or step to add.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with the other pipeline included as a subgraph step or with the step added.

__or__(other)

Combines the current pipeline with another pipeline or step using the '|' operator.

When combining two pipelines, the state types must match.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The other pipeline or step to combine with.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline consisting of the combined steps.

Raises:

Type Description
ValueError

If the state types of the pipelines do not match.

__rshift__(other)

Includes this pipeline as a subgraph in another context using the '>>' operator.

This allows for easy composition where: - If 'other' is a Pipeline: this pipeline becomes a subgraph within the other pipeline - If 'other' is a BasePipelineStep: a new pipeline is created with the step, and this pipeline is included as a subgraph within that pipeline

The syntax pipeline1 >> pipeline2 embeds pipeline1 as a subgraph within pipeline2 (equivalent to pipeline2 << pipeline1). The syntax pipeline >> step creates a new pipeline with the step, and includes this pipeline as a subgraph within that pipeline.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The pipeline to include this pipeline in as a subgraph, or a step to create a new pipeline with.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this pipeline included as a subgraph.

build_graph()

Builds the graph representation of the pipeline by connecting the steps.

get_mermaid_diagram()

Generate a Mermaid diagram representation of the pipeline.

Returns:

Name Type Description
str str

The complete Mermaid diagram representation.

invoke(initial_state, config=None) async

Runs the pipeline asynchronously with the given initial state and configuration.

Parameters:

Name Type Description Default
initial_state dict[str, Any]

The initial state to start the pipeline with.

required
config dict[str, Any]

Additional configuration for the pipeline. User-defined config should not have "langraph_" prefix as it should be reserved for internal use. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The final state after the pipeline execution. If 'debug_state' is set to True in the config, the state logs will be included in the final state with the key 'state_logs'.

Raises:

Type Description
BaseInvokerError

If an error occurs during LM invocation.

CancelledError

If the execution is cancelled, preserved with added context.

TimeoutError

If the execution times out, preserved with added context.

RuntimeError

If an error occurs during pipeline execution. If the error is due to a step execution, the step name will be included in the error message.

RAGState

Bases: TypedDict

A TypedDict representing the state of a Retrieval-Augmented Generation (RAG) pipeline.

This docstring documents the original intention of each of the attributes in the TypedDict. However, in practice, the attributes may be modified or extended to suit the specific requirements of the application. The TypedDict is used to enforce the structure of the state object.

Attributes:

Name Type Description
user_query str

The original query from the user.

queries list[str]

A list of queries generated for retrieval.

retrieval_params dict[str, Any]

Parameters used for the retrieval process.

chunks list

A list of chunks retrieved from the knowledge base.

history str

The history of the conversation or interaction.

context str

The context information used for generating responses.

response_synthesis_bundle dict[str, Any]

Data used for synthesizing the final response.

response str

The generated response to the user's query.

references str | list[str]

References or sources used in generating the response.

event_emitter EventEmitter

An event emitter instance for logging purposes.