Skip to content

Utils

Utility functions for the GLLM pipeline modules.

ErrorContext

Bases: BaseModel

Standardized error context for pipeline steps.

This model provides a structured way to represent error context information for pipeline steps, ensuring consistency in error messages across the pipeline.

Attributes:

Name Type Description
exception BaseException

The exception that was raised.

step_name str

The name of the pipeline step where the error occurred.

step_type str

The type of pipeline step where the error occurred.

state PipelineState | None

The pipeline state at the time of the error. Defaults to None.

operation str

Description of the operation being performed when the error occurred. Defaults to "execution".

additional_context str | None

Additional context to include in the error message. Defaults to None.

__str__()

Convert the error context to a human-readable string.

Returns:

Name Type Description
str str

A formatted error message string.

HasInputsMixin(input_map=None)

Mixin class for steps that consume a unified input_map.

The input_map maps argument names to either a state/config key (str) or a fixed value (Val). Resolution semantics: 1. For str specs: read from state first; if missing, fall back to config["configurable"]. 2. For Val specs: use the literal value.

Attributes:

Name Type Description
input_map dict[str, str | Val]

Mapping of argument names to either a state/config key (str) or a fixed value (Val).

Initialize with a unified input_map.

Parameters:

Name Type Description Default
input_map dict[str, str | Val] | list[str | dict[str, str] | dict[str, Val]] | None

Mapping of argument names to either a state/config key (str) or a fixed value (Val). Also accepts a list form for ergonomics: 1. str: identity mapping ("key" -> {"key": "key"}) 2. dict[str, str]: explicit mapping to state/config key 3. dict[str, Val]: fixed/literal value Defaults to None, in which case an empty mapping is used.

None

from_legacy_map(input_state_map, runtime_config_map, fixed_args) classmethod

Synthesize an input_map from state/config mappings and fixed args.

Precedence: fixed_args > runtime_config_map > input_state_map.

Parameters:

Name Type Description Default
input_state_map dict[str, str] | None

Mapping of argument names to state keys. Defaults to None, in which case an empty mapping is used.

required
runtime_config_map dict[str, str] | None

Mapping of argument names to runtime configuration keys. Defaults to None, in which case an empty mapping is used.

required
fixed_args dict[str, Any] | None

Mapping of argument names to fixed values. Defaults to None, in which case an empty mapping is used.

required

Returns:

Type Description
dict[str, str | Val]

dict[str, str | Val]: The synthesized input_map.

ValidationError(message)

Bases: Exception

Exception raised for errors in state validation.

This exception is raised when input validation fails in pipeline steps. It provides more specific error information than a generic RuntimeError.

Attributes:

Name Type Description
message str

The error message explaining the validation failure.

Initialize with an error message.

Parameters:

Name Type Description Default
message str

The error message explaining what validation failed.

required

combine_mermaid_diagrams(base_diagram, step_diagrams)

Combine a base Mermaid diagram with step diagrams if they exist.

This is a common utility for Pipeline and various step classes that follow the same pattern of combining a base diagram with nested step diagrams.

Parameters:

Name Type Description Default
base_diagram str

The base Mermaid diagram representation.

required
step_diagrams str

The combined step diagrams content.

required

Returns:

Name Type Description
str str

The complete Mermaid diagram representation.

create_edge(graph, sources, target)

Create edges from source nodes to target node in the graph.

Special handling: - START cannot participate in a fan-in. If present, add a direct START -> target edge separately and only fan-in non-START sources.

Parameters:

Name Type Description Default
graph StateGraph

The graph to add edges to.

required
sources str | list[str]

The source nodes. If str or list of 1 element, connect directly. If list > 1 elements, use the list for fan-in. If empty list, do nothing.

required
target str

The target node.

required

create_error_context(exception, step_name, step_type, state, operation, **kwargs)

Create standardized error context for composite steps.

Parameters:

Name Type Description Default
exception Exception

The exception that occurred.

required
step_name str

Name of the step.

required
step_type str

Type of the step.

required
state dict[str, Any]

Pipeline state at time of error.

required
operation str

Operation being performed.

required
**kwargs dict[str, Any]

Additional context key-value pairs.

{}

Returns:

Name Type Description
ErrorContext ErrorContext

ErrorContext object with standardized information.

execute_callable(func, *args, **kwargs) async

Execute a callable function, handling both synchronous and asynchronous functions.

This utility function automatically detects whether the provided function is synchronous or asynchronous and executes it appropriately: 1. For async functions: calls them directly with await 2. For sync functions: runs them in a thread pool to avoid blocking the event loop

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to execute. Can be either sync or async.

required
*args Any

Positional arguments to pass to the function.

()
**kwargs Any

Keyword arguments to pass to the function.

{}

Returns:

Name Type Description
Any Any

The result of the function execution.

Raises:

Type Description
Exception

Any exception raised by the function execution.

execute_sequential_steps(steps, state, runtime) async

Execute a sequence of steps sequentially and return accumulated state updates.

Each step will receive the updated state from the previous step. In the end, all state updates will be merged into a single update dictionary.

Parameters:

Name Type Description Default
steps list[BasePipelineStep]

The steps to run sequentially.

required
state dict[str, Any]

The initial state to pass to the first step.

required
runtime Runtime[dict[str, Any] | BaseModel]

Runtime information for step execution.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The accumulated state updates from all steps, or None if no updates.

extract_step_diagrams(steps)

Extract and format Mermaid diagram content from a list of steps.

This function is used to extract the diagram content from each step in the pipeline. It will remove the mermaid header and return the remaining lines.

Parameters:

Name Type Description Default
steps list[BasePipelineStep]

A list of pipeline steps that may have get_mermaid_diagram methods

required

Returns:

Type Description
list[str]

list[str]: A list of Mermaid diagram lines from the steps

retry_config_to_langgraph_policy(retry_config)

Convert RetryConfig to LangGraph's RetryPolicy.

This function maps the GLLM Core's RetryConfig parameters to LangGraph's RetryPolicy format.

Parameters:

Name Type Description Default
retry_config RetryConfig | None

The GLLM Core's retry configuration. Defaults to None, in which case no retry config is applied.

required

Returns:

Type Description
RetryPolicy | None

RetryPolicy | None: The equivalent LangGraph retry policy, or None if retry_config is None.

Note

The conversion maps the following parameters: 1. max_retries + 1 -> max_attempts (LangGraph counts the first attempt as 1). 2. base_delay -> initial_interval. 3. max_delay -> max_interval. 4. exponential_base -> backoff_factor (using 2.0 as per SDK validation). 5. jitter -> jitter. 6. retry_on_exceptions -> retry_on. 7. timeout is not directly supported by LangGraph's RetryPolicy.