Utils
Utility modules for use in the GLLM Core package.
BinaryHandlingStrategy
Bases: StrEnum
Enum for binary data handling options.
Attributes:
| Name | Type | Description |
|---|---|---|
SKIP |
str
|
Skip binary data. |
BASE64 |
str
|
Encode binary data to base64. |
HEX |
str
|
Encode binary data to hex. |
SHOW_SIZE |
str
|
Show the size of binary data. |
ChunkMetadataMerger(merger_func_map=None, default_merger_func=None, retained_keys=None)
A helper class to merge metadata from multiple chunks.
Attributes:
| Name | Type | Description |
|---|---|---|
merger_func_map |
dict[str, Callable[[list[Any]], Any]]
|
A mapping of metadata keys to merger functions. |
default_merger_func |
Callable[[list[Any]], Any]
|
The default merger function for metadata keys that are not present in the merger_func_map. |
retained_keys |
set[str] | None
|
The keys that should be retained in the merged metadata. If None, all intersection keys are retained. |
Initializes a new instance of the ChunkMetadataMerger class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
merger_func_map |
dict[str, Callable[[list[Any]], Any]] | None
|
A mapping of metadata keys to merger functions. Defaults to None, in which case a default merger map is used. The default merger map: 1. Picks the first value of the PREV_CHUNK_ID key. 2. Picks the last value of the NEXT_CHUNK_ID key. |
None
|
default_merger_func |
Callable[[list[Any]], Any] | None
|
The default merger for metadata keys that are not present in the merger_func_map. Defaults to None, in which case a default merger that picks the first value is used. |
None
|
retained_keys |
set[str] | None
|
The keys that should be retained in the merged metadata. Defaults to None, in which case all intersection keys are retained. |
None
|
merge(metadatas)
Merges metadata from multiple chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadatas |
list[dict[str, Any]]
|
The metadata to merge. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The merged metadata. |
LoggerManager
A singleton class to manage logging configuration.
This class ensures that the root logger is initialized only once and is used across the application.
Basic usage
The LoggerManager can be used to get a logger instance as follows:
logger = LoggerManager().get_logger()
logger.info("This is an info message")
Set logging configuration
The LoggerManager also provides capabilities to set the logging configuration:
manager = LoggerManager()
manager.set_level(logging.DEBUG)
manager.set_log_format(custom_log_format)
manager.set_date_format(custom_date_format)
Add custom handlers
The LoggerManager also provides capabilities to add custom handlers to the root logger:
manager = LoggerManager()
handler = logging.FileHandler("app.log")
manager.add_handler(handler)
Extra error information
When logging errors, extra error information can be added as follows:
logger.error("I am dead!", extra={"error_code": "ERR_CONN_REFUSED"})
Logging modes
The LoggerManager supports three logging modes:
-
Text: Logs in a human-readable format with RichHandler column-based formatting. Used when the
LOG_FORMATenvironment variable is set to "text". Output example:log 2025-10-08T09:26:16 DEBUG [LoggerName] This is a debug message. 2025-10-08T09:26:17 INFO [LoggerName] This is an info message. 2025-10-08T09:26:18 WARNING [LoggerName] This is a warning message. 2025-10-08T09:26:19 ERROR [LoggerName] This is an error message. 2025-10-08T09:26:20 CRITICAL [LoggerName] This is a critical message. -
Simple: Logs in a human-readable format with Rich colors but without columns-based formatting. Used when the
LOG_FORMATenvironment variable is set to "simple". Output example:log [2025-10-08T09:26:16.123 LoggerName DEBUG] This is a debug message. [2025-10-08T09:26:17.456 LoggerName INFO] This is an info message. [2025-10-08T09:26:18.789 LoggerName WARNING] This is a warning message. [2025-10-08T09:26:19.012 LoggerName ERROR] This is an error message. [2025-10-08T09:26:20.345 LoggerName CRITICAL] This is a critical message. -
JSON: Logs in a JSON format, recommended for easy parsing due to the structured nature of the log records. Used when the
LOG_FORMATenvironment variable is set to "json". Output example:log {"timestamp": "2025-10-08T11:23:43+0700", "name": "LoggerName", "level": "DEBUG", "message": "..."} {"timestamp": "2025-10-08T11:23:44+0700", "name": "LoggerName", "level": "INFO", "message": "..."} {"timestamp": "2025-10-08T11:23:45+0700", "name": "LoggerName", "level": "WARNING", "message": "..."} {"timestamp": "2025-10-08T11:23:46+0700", "name": "LoggerName", "level": "ERROR", "message": "..."} {"timestamp": "2025-10-08T11:23:47+0700", "name": "LoggerName", "level": "CRITICAL", "message": "..."}
When the LOG_FORMAT environment is not set, the LoggerManager defaults to "text" mode.
__new__()
Initialize the singleton instance.
Returns:
| Name | Type | Description |
|---|---|---|
LoggerManager |
LoggerManager
|
The singleton instance. |
add_handler(handler)
Add a custom handler to the root logger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler |
Handler
|
The handler to add to the root logger. |
required |
get_logger(name=None)
Get a logger instance.
This method returns a logger instance that is a child of the root logger. If name is not provided, the root logger will be returned instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | None
|
The name of the child logger. If None, the root logger will be returned. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Logger
|
logging.Logger: Configured logger instance. |
set_date_format(date_format)
Set date format for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
date_format |
str
|
The date format to set. |
required |
set_level(level)
Set logging level for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level |
int
|
The logging level to set (e.g., logging.INFO, logging.DEBUG). |
required |
set_log_format(log_format)
Set logging format for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_format |
str
|
The log format to set. |
required |
MergerMethod
A collection of merger methods.
concatenate(delimiter='-')
staticmethod
Creates a function that concatenates a list of values with a delimiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delimiter |
str
|
The delimiter to use when concatenating the values. Defaults to "-". |
'-'
|
Returns:
| Type | Description |
|---|---|
Callable[[list[Any]], str]
|
Callable[[list[Any]], str]: A function that concatenates a list of values with the delimiter. |
merge_overlapping_strings(delimiter='\n')
staticmethod
Creates a function that merges a list of strings, handling common prefixes and overlaps.
The created function will: - Identify and remove any common prefix shared by the strings. - Process each pair of adjacent strings to remove overlapping strings. - Join the cleaned strings together, including the common prefix at the beginning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delimiter |
str
|
The delimiter to use when merging the values. Defaults to "\n". |
'\n'
|
Returns:
| Type | Description |
|---|---|
Callable[[list[str]], str]
|
Callable[[list[str]], str]: A function that merges a list of strings, handling common prefixes and overlaps. |
pick_first(values)
staticmethod
Picks the first value from a list of values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values |
list[Any]
|
The values to pick from. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The first value from the list. |
pick_last(values)
staticmethod
Picks the last value from a list of values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values |
list[Any]
|
The values to pick from. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The last value from the list. |
RetryConfig
Bases: BaseModel
Configuration for retry behavior.
Attributes:
| Name | Type | Description |
|---|---|---|
max_retries |
int
|
Maximum number of retry attempts. |
base_delay |
float
|
Base delay in seconds between retries. |
max_delay |
float
|
Maximum delay in seconds between retries. |
exponential_base |
float
|
Base for exponential backoff. Deprecated and will be removed in v0.4. |
jitter |
bool
|
Whether to add random jitter to delays. |
timeout |
float | None
|
Overall timeout in seconds for the entire operation. If None, timeout is disabled. |
retry_on_exceptions |
tuple[type[Exception], ...]
|
Tuple of exception types to retry on. |
validate_delay_constraints()
Validates that max_delay is greater than or equal to base_delay.
Returns:
| Name | Type | Description |
|---|---|---|
RetryConfig |
RetryConfig
|
The validated configuration. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If max_delay is less than base_delay. |
RunAnalyzer(cls)
Bases: NodeVisitor
AST NodeVisitor that analyzes a class to build a RunProfile.
The run analyzer visits the AST nodes of a class to analyze the _run method and build a RunProfile. It will look for the usage of the **kwargs parameter in method calls and subscript expressions. The traversal result is stored as a RunProfile object.
Attributes:
| Name | Type | Description |
|---|---|---|
cls |
type
|
The class to analyze. |
profile |
RunProfile
|
The profile of the run being analyzed. |
Initialize the RunAnalyzer with a class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls |
type
|
The class to analyze. |
required |
visit_Call(node)
Visit a Call node in the AST.
This node represents a function call in the source code. Here, we are looking for calls to methods that fully pass the kwargs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node |
Call
|
The Call node to visit. |
required |
visit_Subscript(node)
Visit a Subscript node in the AST.
The Subscript node represents a subscripted value in the source code. Example: kwargs["key"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node |
Subscript
|
The Subscript node to visit. |
required |
asyncify(func, *, cancellable=False, limiter=None)
Wrap a sync function into an awaitable callable using a worker thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func |
Callable[P, R]
|
Synchronous function to wrap. |
required |
cancellable |
bool
|
If True, allow cancellation of the awaiter while running in a worker thread. Defaults to False. |
False
|
limiter |
CapacityLimiter | None
|
Capacity limiter to throttle concurrent thread usage. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, Awaitable[R]]
|
Callable[P, Awaitable[R]]: An async function that when awaited will execute |
Callable[P, Awaitable[R]]
|
worker thread and return its result. |
Usage
async def handler() -> int:
wrapped = asyncify(blocking_func)
return await wrapped(1, 2)
format_chunk_message(chunk, rank=None, include_score=True, include_metadata=True)
Formats a log to display a single chunk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk |
Chunk
|
The chunk to be formatted. |
required |
rank |
int
|
The optional rank of the formatted chunk. Defaults to None. |
None
|
include_score |
bool
|
Whether to include the score in the formatted message. Defaults to True. |
True
|
include_metadata |
bool
|
Whether to include the metadata in the formatted message. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A formatted log message that displays information about the logged chunk. |
get_default_portal()
Return the shared default BlockingPortal.
Returns:
| Name | Type | Description |
|---|---|---|
BlockingPortal |
BlockingPortal
|
A process-wide portal running on a background thread. |
get_placeholder_keys(template)
Extracts keys from a template string based on a regex pattern.
This function searches the template for placeholders enclosed in single curly braces {} and ignores
any placeholders within double curly braces {{}}. It returns a list of the keys found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
template |
str
|
The template string containing placeholders. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: A list of keys extracted from the template. |
load_gsheets(client_email, private_key, sheet_id, worksheet_id)
Loads data from a Google Sheets worksheet.
This function retrieves data from a Google Sheets worksheet using service account credentials. It authorizes the client, selects the specified worksheet, and reads the worksheet data. The first row of the worksheet will be treated as the column names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_email |
str
|
The client email associated with the service account. |
required |
private_key |
str
|
The private key used for authentication. |
required |
sheet_id |
str
|
The ID of the Google Sheet. |
required |
worksheet_id |
str
|
The ID of the worksheet within the Google Sheet. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, str]]
|
list[dict[str, str]]: A list of dictionaries containing the Google Sheets content. |
syncify(async_func, *, portal=None)
Wrap an async function to be callable from synchronous code.
Lifecycle and portals:
1. This helper uses an already running AnyIO BlockingPortal to execute the coroutine.
2. If portal is not provided, a process-wide shared portal is used. Its lifecycle is
managed internally: it is created lazily on first use and shut down automatically at process exit.
3. If you provide a portal, you are expected to manage its lifecycle, typically with a
context manager. This is recommended when making many calls in a bounded scope since it
avoids per-call startup costs while allowing deterministic teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_func |
Callable[P, Awaitable[R]]
|
Asynchronous function to wrap. |
required |
portal |
BlockingPortal | None
|
Portal to use for calling the async function from sync code. Defaults to None, in which case a shared default portal is used. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, R]
|
Callable[P, R]: A synchronous function that runs the coroutine and returns its result. |
Usage
# Use the default shared portal (most convenient)
def do_work(x: int) -> int:
sync_call = syncify(async_func)
return sync_call(x)
# Reuse a scoped portal for multiple calls (deterministic lifecycle)
from anyio.from_thread import start_blocking_portal
with start_blocking_portal() as portal:
sync_call = syncify(async_func, portal=portal)
a = sync_call(1)
b = sync_call(2)
Notes
Creating a brand-new portal per call is discouraged due to the overhead of spinning up and tearing down a background event loop/thread. Prefer the shared portal or a scoped portal reused for a batch of calls.
validate_string_enum(enum_type, value)
Validates that the provided value is a valid string enum value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enum_type |
type[StrEnum]
|
The type of the string enum. |
required |
value |
str
|
The value to validate. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the provided value is not a valid string enum value. |