Skip to content

Types

Type definitions for the pipeline.

CacheConfig

Bases: BaseModel

Configuration for caching in pipeline steps and pipelines.

This model encapsulates the cache store instance and its key generation function. It provides a single parameter for configuring caching.

Examples:

from gllm_pipeline.types import CacheConfig
from gllm_datastore.cache.cache import Cache

# Basic usage with just the store
cache_config = CacheConfig(store=redis_cache)

# With custom key function
def custom_key_func(state: dict, runtime, config=None, **kwargs) -> str:
    user_id = state.get("user_id")
    return f"user:{user_id}"

cache_config = CacheConfig(
    store=redis_cache,
    key_func=custom_key_func
)

# With custom cache name
cache_config = CacheConfig(
    store=redis_cache,
    name="my_custom_cache"
)

Attributes:

Name Type Description
store Cache

The cache store instance to use for caching results.

name str | None

A custom name for the cache. If not provided, defaults to "step_{step_name}" for steps and "pipeline_{pipeline_name}" for pipelines.

key_func Callable[..., str] | None

A function to generate cache keys. If None, the cache instance will use its own default key generation.

Group(value) dataclass

A wrapper that keeps a resolved input intact during map-reduce fan-out.

Attributes:

Name Type Description
value str | Val

Wrapped lookup key or literal wrapper to preserve across fan-out.

__post_init__()

Validate that Group only wraps supported input-map specs.

Raises:

Type Description
TypeError

If the wrapped value is neither a lookup string nor a Val literal.

PipelineInvocation

Bases: BaseModel

A model representing the invocation parameters for a pipeline.

This model is used to validate the structure of data returned by pre_invoke_transform functions. It ensures the data has the correct format for pipeline invocation.

Examples:

invocation = PipelineInvocation(
    input={"user_query": "What is AI?"},
    config={"session_id": "abc123"}
)

# With no config
invocation = PipelineInvocation(input={"query": "test"})

Attributes:

Name Type Description
input dict[str, Any]

The input data to pass to the pipeline. This is a required field and must be a dictionary.

config dict[str, Any] | None

Optional configuration for the pipeline execution. Must be a dictionary if provided.

Val(value) dataclass

A value wrapper that represents a fixed/literal input.

Attributes:

Name Type Description
value object

The literal value of the input.