Utils
Utility functions for the GLLM pipeline modules.
ErrorContext
Bases: BaseModel
Standardized error context for pipeline steps.
This model provides a structured way to represent error context information for pipeline steps, ensuring consistency in error messages across the pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
exception |
BaseException
|
The exception that was raised. |
step_name |
str
|
The name of the pipeline step where the error occurred. |
step_type |
str
|
The type of pipeline step where the error occurred. |
state |
PipelineState | None
|
The pipeline state at the time of the error. Defaults to None. |
operation |
str
|
Description of the operation being performed when the error occurred. Defaults to "execution". |
additional_context |
str | None
|
Additional context to include in the error message. Defaults to None. |
__str__()
Convert the error context to a human-readable string.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A formatted error message string. |
HasInputsMixin(input_map=None)
Mixin class for steps that consume a unified input_map.
The input_map maps argument names to either a state/config key (str) or a fixed value (Val). Resolution semantics: 1. For str specs: read from state first; if missing, fall back to config["configurable"]. 2. For Val specs: use the literal value.
Attributes:
| Name | Type | Description |
|---|---|---|
input_map |
dict[str, str | Val]
|
Mapping of argument names to either a state/config key (str) or a fixed value (Val). |
Initialize with a unified input_map.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_map |
dict[str, str | Val] | list[str | dict[str, str] | dict[str, Val]] | None
|
Mapping of argument names to either a state/config key (str) or a fixed value (Val). Also accepts a list form for ergonomics: 1. str: identity mapping ("key" -> {"key": "key"}) 2. dict[str, str]: explicit mapping to state/config key 3. dict[str, Val]: fixed/literal value Defaults to None, in which case an empty mapping is used. |
None
|
from_legacy_map(input_state_map, runtime_config_map, fixed_args)
classmethod
Synthesize an input_map from state/config mappings and fixed args.
Precedence: fixed_args > runtime_config_map > input_state_map.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_state_map |
dict[str, str] | None
|
Mapping of argument names to state keys. Defaults to None, in which case an empty mapping is used. |
required |
runtime_config_map |
dict[str, str] | None
|
Mapping of argument names to runtime configuration keys. Defaults to None, in which case an empty mapping is used. |
required |
fixed_args |
dict[str, Any] | None
|
Mapping of argument names to fixed values. Defaults to None, in which case an empty mapping is used. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, str | Val]
|
dict[str, str | Val]: The synthesized input_map. |
ValidationError(message)
Bases: Exception
Exception raised for errors in state validation.
This exception is raised when input validation fails in pipeline steps. It provides more specific error information than a generic RuntimeError.
Attributes:
| Name | Type | Description |
|---|---|---|
message |
str
|
The error message explaining the validation failure. |
Initialize with an error message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
str
|
The error message explaining what validation failed. |
required |
combine_mermaid_diagrams(base_diagram, step_diagrams)
Combine a base Mermaid diagram with step diagrams if they exist.
This is a common utility for Pipeline and various step classes that follow the same pattern of combining a base diagram with nested step diagrams.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_diagram |
str
|
The base Mermaid diagram representation. |
required |
step_diagrams |
str
|
The combined step diagrams content. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The complete Mermaid diagram representation. |
create_edge(graph, sources, target)
Create edges from source nodes to target node in the graph.
Special handling: - START cannot participate in a fan-in. If present, add a direct START -> target edge separately and only fan-in non-START sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph |
StateGraph
|
The graph to add edges to. |
required |
sources |
str | list[str]
|
The source nodes. If str or list of 1 element, connect directly. If list > 1 elements, use the list for fan-in. If empty list, do nothing. |
required |
target |
str
|
The target node. |
required |
create_error_context(exception, step_name, step_type, state, operation, **kwargs)
Create standardized error context for composite steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exception |
Exception
|
The exception that occurred. |
required |
step_name |
str
|
Name of the step. |
required |
step_type |
str
|
Type of the step. |
required |
state |
dict[str, Any]
|
Pipeline state at time of error. |
required |
operation |
str
|
Operation being performed. |
required |
**kwargs |
dict[str, Any]
|
Additional context key-value pairs. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
ErrorContext |
ErrorContext
|
ErrorContext object with standardized information. |
execute_callable(func, *args, **kwargs)
async
Execute a callable function, handling both synchronous and asynchronous functions.
This utility function automatically detects whether the provided function is synchronous or asynchronous and executes it appropriately: 1. For async functions: calls them directly with await 2. For sync functions: runs them in a thread pool to avoid blocking the event loop
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable[..., Any]
|
The function to execute. Can be either sync or async. |
required |
*args |
Any
|
Positional arguments to pass to the function. |
()
|
**kwargs |
Any
|
Keyword arguments to pass to the function. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the function execution. |
Raises:
| Type | Description |
|---|---|
Exception
|
Any exception raised by the function execution. |
execute_sequential_steps(steps, state, runtime)
async
Execute a sequence of steps sequentially and return accumulated state updates.
Each step will receive the updated state from the previous step. In the end, all state updates will be merged into a single update dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps |
list[BasePipelineStep]
|
The steps to run sequentially. |
required |
state |
dict[str, Any]
|
The initial state to pass to the first step. |
required |
runtime |
Runtime[dict[str, Any] | BaseModel]
|
Runtime information for step execution. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
dict[str, Any] | None: The accumulated state updates from all steps, or None if no updates. |
extract_step_diagrams(steps)
Extract and format Mermaid diagram content from a list of steps.
This function is used to extract the diagram content from each step in the pipeline. It will remove the mermaid header and return the remaining lines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps |
list[BasePipelineStep]
|
A list of pipeline steps that may have get_mermaid_diagram methods |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: A list of Mermaid diagram lines from the steps |
retry_config_to_langgraph_policy(retry_config)
Convert RetryConfig to LangGraph's RetryPolicy.
This function maps the GLLM Core's RetryConfig parameters to LangGraph's RetryPolicy format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_config |
RetryConfig | None
|
The GLLM Core's retry configuration. Defaults to None, in which case no retry config is applied. |
required |
Returns:
| Type | Description |
|---|---|
RetryPolicy | None
|
RetryPolicy | None: The equivalent LangGraph retry policy, or None if retry_config is None. |
Note
The conversion maps the following parameters: 1. max_retries + 1 -> max_attempts (LangGraph counts the first attempt as 1). 2. base_delay -> initial_interval. 3. max_delay -> max_interval. 4. exponential_base -> backoff_factor (using 2.0 as per SDK validation). 5. jitter -> jitter. 6. retry_on_exceptions -> retry_on. 7. timeout is not directly supported by LangGraph's RetryPolicy.