Skip to content

Retriever

Module for retriever classes.

BM25Retriever(data_store)

Bases: BaseVectorRetriever

Retrieves documents from the Elasticsearch data store using BM25.

Examples:

retriever = BM25Retriever(data_store=elasticsearch_data_store)
results = await retriever.retrieve("search query", top_k=10)

Initialize the BM25Retriever.

.. deprecated:: 0.5.20 This class is deprecated and will be removed in version 0.6.0. Use :class:gllm_retrieval.retriever.FulltextRetriever instead, which provides the same functionality with support for the new BaseDataStore API and batch queries.

Example
from gllm_retrieval.retriever import FulltextRetriever

# Old way (deprecated)
retriever = BM25Retriever(data_store=data_store)

# New way (recommended)
retriever = FulltextRetriever(data_store=data_store.with_fulltext())

Parameters:

Name Type Description Default
data_store BaseVectorDataStore

The vector data store with BM25 capabilities.

required

BasicSQLRetriever(sql_data_store, lm_request_processor=None, extract_func=None, max_retries=0, preprocess_query_func=None)

Bases: BaseSQLRetriever

Initializes a new instance of the BasicSQLRetriever class.

This class provides a straightforward implementation of the BasicSQLRetriever, using a single data store for document retrieval.

Examples:

from gllm_datastore.sql_data_store.sql_data_store import SQLDataStore
from gllm_retrieval.retriever.sql_retriever.basic_sql_retriever import BasicSQLRetriever

# Create a retriever with an existing data store
retriever = BasicSQLRetriever(sql_data_store)

# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")

Attributes:

Name Type Description
sql_data_store BaseSQLDataStore

The SQL database data store to be used.

lm_request_processor LMRequestProcessor | None

The LMRequestProcessor instance to be used for modifying a failed query. If not None, will modify the query upon failure.

extract_func Callable[[str | list[str] | dict[str, str | list[str]]], str | list[str]]

A function to extract the modified query from the LM output.

max_retries int

The maximum number of retries for the retrieval process.

preprocess_query_func Callable[[str], str] | None

A function to preprocess the query before execution.

logger Logger

The logger instance to be used for logging.

Initializes the BasicSQLRetriever object.

Parameters:

Name Type Description Default
sql_data_store BaseSQLDataStore

The SQL database data store to be used.

required
lm_request_processor LMRequestProcessor | None

The LMRequestProcessor instance to be used for modifying a failed query. If not None, will modify the query upon failure.

None
extract_func Callable[[str | list[str] | dict[str, str | list[str]]], str | list[str]]

A function to extract the transformed query from the output. Defaults to None, in which case a default extractor will be used.

None
max_retries int

The maximum number of retries for the retrieval process. Defaults to 0.

0
preprocess_query_func Callable[[str], str] | None

A function to preprocess the query before. Defaults to None.

None

Raises:

Type Description
ValueError

If lm_request_processor is not provided when max_retries is greater than 0.

retrieve(query, event_emitter=None, prompt_kwargs=None, return_query=False) async

Retrieve data based on the query.

This method performs a retrieval operation using the configured data store.

Parameters:

Name Type Description Default
query str

The query string to retrieve documents.

required
event_emitter EventEmitter | None

The event emitter to emit events. Defaults to None.

None
prompt_kwargs dict[str, Any] | None

Additional keyword arguments for the prompt. Defaults to None.

None
return_query bool

If True, returns a tuple of the executed query and the result. If False, returns only the result. Defaults to False.

False

Returns:

Type Description
DataFrame | tuple[str, DataFrame]

pd.DataFrame | tuple[str, pd.DataFrame]: The result of the retrieval process. If return_query is True, returns a tuple of the executed query and the result. If return_query is False, returns only the result.

Raises:

Type Description
ValueError

If the retrieval process fails after the maximum number of retries.

BasicVectorRetriever(data_store)

Bases: BaseVectorRetriever

Initializes a new instance of the BasicVectorRetriever class.

.. deprecated:: 0.5.20 This class is deprecated and will be removed in version 0.6.0. Use :class:gllm_retrieval.retriever.VectorRetriever instead, which provides the same functionality with support for the new BaseDataStore API and batch queries.

This class provides a straightforward implementation of the BaseRetriever, using a single data store for document retrieval.

Examples:

retriever = BasicVectorRetriever(data_store=elasticsearch_data_store)
results = await retriever.retrieve("search query", top_k=10)

Attributes:

Name Type Description
data_store BaseVectorDataStore

The data store used for retrieval operations.

Initializes a new instance of the BasicRetriever class.

Parameters:

Name Type Description Default
data_store BaseVectorDataStore

The data store to be used for retrieval operations.

required

EnsembleRetriever(retrievers, weights=None, rank_constant=60, min_candidate=None)

Bases: BaseRetriever[list[FuseableT]]

Retriever that ensembles the multiple retrievers.

Examples:

ensemble_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, bm25_retriever],
    weights=[0.7, 0.3]
)
results = await ensemble_retriever.retrieve("search query", top_k=10)

Attributes:

Name Type Description
retrievers list[BaseRetriever[list[FuseableT]]]

A list of retrievers to ensemble. All retrievers must return the same type that implements the Fuseable protocol.

weights list[float] | None

A list of weights corresponding to the retrievers. Defaults to equal weighting for all retrievers. Weights must be strictly positive. If they do not sum to 1.0, they will be normalized internally and a warning will be logged.

rank_constant int

A constant added to the rank, controlling the balance between the importance of high-ranked items and the consideration given to lower-ranked items. Default to 60.

min_candidate int | None

Minimum number of candidates to fetch per retriever before fusion (per-retriever floor). The effective per-retriever k is computed as: max(min_candidate, ceil(top_k / number_of_retrievers)). Defaults to 1 if not provided.

Initialize the EnsembleRetriever with a list of retrievers and optional weights.

Parameters:

Name Type Description Default
retrievers list[BaseRetriever[list[FuseableT]]]

A list of retrievers to ensemble. All retrievers must return the same type that implements the Fuseable protocol.

required
weights list[float] | None

A list of weights corresponding to the retrievers. Defaults to equal weighting for all retrievers. Weights must be strictly positive. If they do not sum to 1.0, they will be normalized internally and a warning will be logged.

None
rank_constant int

A constant added to the rank, controlling the balance between the importance of high-ranked items and the consideration given to lower-ranked items. Default to 60.

60
min_candidate int | None

Minimum number of candidates to fetch per retriever before fusion (per-retriever floor). The effective per-retriever k is computed as: max(min_candidate, ceil(top_k / number_of_retrievers)). Defaults to 1 if not provided.

None

Raises:

Type Description
ValueError

If the list of retrievers is empty or contains less than two retrievers.

ValueError

If the length of weights does not match the number of retrievers.

ValueError

If any weight is non-positive.

ValueError

If the sum of weights is not close to 1.0.

rank_fusion(query, top_k=DEFAULT_TOP_K, query_filter=None, **kwargs) async

Asynchronously retrieve the results of the retrievers.

Parameters:

Name Type Description Default
query str | None

The query to search for.

required
top_k int

The number of results to return.

DEFAULT_TOP_K
query_filter FilterClause | QueryFilter | None

Filter criteria for the retrieval.

None
**kwargs Any

Additional retrieval parameters.

{}

Returns:

Type Description
list[FuseableT]

list[FuseableT]: A list of reranked items.

retrieve(query, query_filter=None, **kwargs) async

Retrieve documents using ensemble retrieval.

Parameters:

Name Type Description Default
query str | list[str]

The query string or list of query strings to retrieve documents.

required
query_filter FilterClause | QueryFilter | None

Filter criteria for the retrieval. Defaults to None.

None
**kwargs Any

Additional parameters for the retrieval process. Common parameters include: 1. top_k (int): Maximum number of documents to retrieve. Defaults to DEFAULT_TOP_K. 2. threshold (float): Minimum score threshold for filtering results. Defaults to None. 3. timeout (float): Maximum time in seconds to wait for retrieval. Defaults to None.

{}

Returns:

Type Description
list[FuseableT] | list[list[FuseableT]]

list[FuseableT] | list[list[FuseableT]]: Retrieved items. Returns list[FuseableT] for single query, list[list[FuseableT]] for batch queries.

weighted_reciprocal_rank(doc_lists)

Perform weighted Reciprocal Rank Fusion on multiple rank lists.

Parameters:

Name Type Description Default
doc_lists list[list[FuseableT]]

A list of rank lists, where each rank list contains unique items.

required

Returns:

Type Description
list[FuseableT]

list[FuseableT]: The final aggregated list of items sorted by their weighted RRF scores in descending order.

FulltextRetriever(data_store)

Bases: DatastoreChunkRetriever

A fulltext retriever using BaseDataStore with fulltext capability.

This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with fulltext capability for document retrieval. The retrieval strategy depends on the datastore implementation and can be customized via kwargs.

Examples:

from gllm_datastore.data_store.elasticsearch.fulltext import SupportedQueryMethods
from gllm_datastore.core.filters import filter as F

data_store = ElasticsearchDataStore(...).with_fulltext(index_name="my_index")
retriever = FulltextRetriever(data_store=data_store)

results = await retriever.retrieve(
    query_filter=F.eq("metadata.category", "AI"),
    top_k=10
)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]

results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]

results = await retriever.retrieve(
    "search query",
    top_k=10,
    strategy=SupportedQueryMethods.BM25,  # or strategy="bm25"
    k1=1.5,  # BM25 parameter
    b=0.75   # BM25 parameter
)
# results: [Chunk(content="...", score=0.92), ...]

batch_results = await retriever.retrieve(
    ["query 1", "query 2"],
    top_k=10,
    strategy="bm25"
)
# batch_results: [
#     [Chunk(content="...", score=0.92), ...],  # results for "query 1"
#     [Chunk(content="...", score=0.88), ...]   # results for "query 2"
# ]

Attributes:

Name Type Description
data_store BaseDataStore

The data store with fulltext capability used for retrieval operations.

Initialize the FulltextRetriever with a data store.

Parameters:

Name Type Description Default
data_store BaseDataStore

The data store to retrieve from. Must have fulltext capability registered.

required

Raises:

Type Description
TypeError

If data_store is not an instance of BaseDataStore.

ValueError

If data_store does not have fulltext capability registered.

retrieve(query=None, query_filter=None, top_k=None, **kwargs) async

retrieve(query: str | None = None, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, **kwargs: Any) -> list[list[Chunk]]

Retrieve documents using fulltext search.

This method performs a retrieval operation using the configured data store's fulltext capability. The retrieval strategy depends on the datastore implementation and can be customized via kwargs.

Parameters:

Name Type Description Default
query str | list[str] | None

The query string or list of query strings. If a list is provided, retrieval is performed for each query concurrently. If None, retrieval is based on filters only (e.g., BY_FIELD strategy). Defaults to None.

None
query_filter FilterClause | QueryFilter | None

Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None.

None
top_k int | None

The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit.

None
**kwargs Any

Additional parameters passed to the data store's retrieve method. For Elasticsearch, common parameters include: 1. strategy (str): The retrieval strategy (e.g., "bm25", "by_field", "fuzzy") 2. k1 (float): BM25 parameter controlling term frequency saturation (default ~1.2). 3. b (float): BM25 parameter controlling document length normalization (default ~0.75).

{}

Returns:

Type Description
list[Chunk] | list[list[Chunk]]

list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by relevance score. Returns list[list[Chunk]] if query is a list of strings.

HybridRetriever(data_store)

Bases: DatastoreChunkRetriever

A hybrid retriever using BaseDataStore with hybrid capability.

This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with hybrid capability for document retrieval via combined search paradigms (fulltext and vector).

Examples:

from gllm_inference.em_invoker import OpenAIEMInvoker
from gllm_inference.model import OpenAIEM

from gllm_datastore.core.capabilities.hybrid_capability import HybridSearchType, SearchConfig
from gllm_datastore.core.filters import filter as F
from gllm_datastore.data_store.elasticsearch.data_store import ElasticsearchDataStore

em_invoker = OpenAIEMInvoker(OpenAIEM.TEXT_EMBEDDING_3_SMALL)
hybrid_config = [
    SearchConfig(search_type=HybridSearchType.FULLTEXT, field="text", weight=0.3),
    SearchConfig(search_type=HybridSearchType.VECTOR, field="embedding", weight=0.7, em_invoker=em_invoker),
]
data_store = ElasticsearchDataStore(
    index_name="my_index", url="http://localhost:9200"
).with_hybrid(config=hybrid_config)
retriever = HybridRetriever(data_store=data_store)

results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]

results = await retriever.retrieve(
    "search query",
    query_filter=F.eq("metadata.category", "AI"),
    top_k=10,
    threshold=0.8
)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]

batch_results = await retriever.retrieve(["query 1", "query 2"], top_k=10)
# batch_results: [
#     [Chunk(content="...", score=0.95), ...],  # results for "query 1"
#     [Chunk(content="...", score=0.89), ...]   # results for "query 2"
# ]

Attributes:

Name Type Description
data_store BaseDataStore

The data store with hybrid capability used for retrieval operations.

Initialize the HybridRetriever with a data store.

Parameters:

Name Type Description Default
data_store BaseDataStore

The data store to retrieve from. Must have hybrid capability registered.

required

Raises:

Type Description
TypeError

If data_store is not an instance of BaseDataStore.

ValueError

If data_store does not have hybrid capability registered.

retrieve(query, query_filter=None, top_k=None, threshold=None, **kwargs) async

retrieve(query: str, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[list[Chunk]]

Retrieve documents based on the query using hybrid search.

This method performs a retrieval operation using the configured data store's hybrid capability, combining multiple search paradigms (fulltext and vector).

Parameters:

Name Type Description Default
query str | list[str]

The query string or list of query strings to retrieve documents. If a list is provided, retrieval is performed for each query concurrently.

required
query_filter FilterClause | QueryFilter | None

Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None.

None
top_k int | None

The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit.

None
threshold float | None

The minimum score threshold for filtering results. Defaults to None, in which case no filtering is applied.

None
**kwargs Any

Additional parameters passed to the data store's retrieve method.

{}

Returns:

Type Description
list[Chunk] | list[list[Chunk]]

list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by relevance score. Returns list[list[Chunk]] if query is a list of strings.

LightRAGRetriever(data_store)

Bases: BaseGraphRAGRetriever

Retriever implementation for LightRAG-based graph RAG.

This class provides a retriever interface for LightRAG, a graph-based Retrieval Augmented Generation system. It handles the conversion between LightRAG's raw output format and structured data objects, allowing for retrieval of document chunks, knowledge graph entities, and relationships based on natural language queries.

The retriever works with LightRAG data stores to perform graph-aware retrieval operations that leverage both semantic similarity and graph structure to find relevant information.

Examples:

from gllm_datastore.graph_data_store.light_rag_data_store import LightRAGDataStore
from gllm_retrieval.retriever.graph_retriever.light_rag_retriever import LightRAGRetriever
from gllm_retrieval.retriever.graph_retriever.constants import ReturnType

# Create a retriever with an existing data store
retriever = LightRAGRetriever(light_rag_data_store)

# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")

# Retrieve as dictionary with nodes and edges
result_dict = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.DICT
)

# Retrieve as chunk contents (strings)
chunk_contents = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.STRINGS
)

# Retrieve raw result from LightRAG
raw_result = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.STRING
)

# Retrieve as synthesized response
result = await retriever.retrieve(
    "What is machine learning?",
    only_need_context=False
)

Attributes:

Name Type Description
data_store BaseLightRAGDataStore

The LightRAG data store used for retrieval operations.

_logger Logger

Logger instance for this class.

Initialize the LightRAGRetriever.

Parameters:

Name Type Description Default
data_store BaseLightRAGDataStore

The LightRAG data store to use for retrieval operations.

required

retrieve(query, retrieval_params=None, event_emitter=None, return_type=ReturnType.CHUNKS, only_need_context=None) async

Retrieve information from LightRAG based on a natural language query.

This method queries the LightRAG data store and processes the results according to the specified return type. It can return the raw result, structured chunks, strings, or a dictionary containing nodes and edges from the knowledge graph.

Parameters:

Name Type Description Default
query str

The natural language query to retrieve information for.

required
retrieval_params dict[str, Any] | None

Optional dictionary of parameters to pass to the LightRAG query. Defaults to None.

None
event_emitter EventEmitter | None

Optional event emitter for tracking retrieval events. Defaults to None.

None
return_type ReturnType

The type of result to return (CHUNKS, STRINGS, DICT, or RAW). Defaults to ReturnType.CHUNKS.

CHUNKS
only_need_context bool | None

Whether to only return the context (True) or the full result (False). If None, uses the value from retrieval_params or defaults to True. Defaults to None.

None

Returns:

Type Description
str | list[str] | list[Chunk] | dict[str, Any]

str | list[str] | list[Chunk] | dict[str, Any]: Depending on the only_need_context and return_type parameters: - only_need_context=False: Synthesized response string using LM invoker. - only_need_context=True: - ReturnType.CHUNKS (default): List of Chunk objects. - ReturnType.STRINGS: List of content strings from chunks. - ReturnType.DICT: Dictionary representation of LightRAGRetrievalResult. - ReturnType.STRING: Raw result string from LightRAG without postprocessing.

LlamaIndexGraphRAGRetriever(data_store, property_graph_retriever=None, llama_index_llm=None, embed_model=None, vector_store=None, **kwargs)

Bases: BaseGraphRAGRetriever

A retriever class for querying a knowledge graph using the LlamaIndex framework.

Examples:

from gllm_datastore.graph_data_store.llama_index_graph_rag_data_store import LlamaIndexGraphRAGDataStore
from gllm_retrieval.retriever.graph_retriever.llama_index_graph_rag_retriever import LlamaIndexGraphRAGRetriever
from gllm_retrieval.retriever.graph_retriever.constants import ReturnType

# Create a retriever with an existing data store
retriever = LlamaIndexGraphRAGRetriever(llama_index_graph_rag_data_store)

# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")

# Retrieve as dictionary with nodes and edges
result_dict = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.DICT
)

# Retrieve as chunk contents (strings)
chunk_contents = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.STRINGS
)

# Retrieve raw result from LlamaIndexGraphRAG
raw_result = await retriever.retrieve(
    "What is machine learning?",
    return_type=ReturnType.STRING
)

# Retrieve as synthesized response
result = await retriever.retrieve(
    "What is machine learning?",
    only_need_context=False
)

Attributes:

Name Type Description
_index PropertyGraphIndex

The property graph index to use.

_graph_store LlamaIndexGraphRAGDataStore | PropertyGraphStore

The graph store to use.

_llm BaseLLM | None

The language model to use.

_embed_model BaseEmbedding | None

The embedding model to use.

_property_graph_retriever PGRetriever

The property graph retriever to use.

_default_return_type ReturnType

The default return type for retrieve method.

Initializes the LlamaIndexGraphRAGRetriever with the provided components.

Parameters:

Name Type Description Default
data_store LlamaIndexGraphRAGDataStore | PropertyGraphStore

The graph store to use.

required
property_graph_retriever PGRetriever | None

An existing retriever to use.

None
llama_index_llm BaseLLM | None

The language model to use for text-to-Cypher retrieval. Defaults to None. Deprecated: Use data_store with lm_invoker instead. Instantiate the LLM via LlamaIndexGraphRAGDataStore (e.g., LlamaIndexGraphRAGDataStore(lm_invoker=...)).

None
embed_model BaseEmbedding | None

The embedding model to use. Defaults to None. Deprecated: Use data_store with em_invoker instead. Instantiate the embedding model via LlamaIndexGraphRAGDataStore (e.g., LlamaIndexGraphRAGDataStore(em_invoker=...)).

None
vector_store BasePydanticVectorStore | None

The vector store to use.

None
**kwargs Any

Additional keyword arguments. Supported kwargs: - default_return_type (ReturnType): Default return type for retrieve method. Defaults to "chunks".

{}

Raises:

Type Description
ValueError

If an invalid return type is provided.

retrieve(query, retrieval_params=None, event_emitter=None, **kwargs) async

Retrieves relevant documents for a given query.

Parameters:

Name Type Description Default
query str

The query string to search for.

required
retrieval_params dict[str, Any] | None

Additional retrieval parameters.

None
event_emitter EventEmitter | None

Event emitter for logging.

None
**kwargs Any

Additional keyword arguments. Supported kwargs: - return_type (ReturnType): Type of return value ("chunks" or "strings"). Defaults to value set in constructor.

{}

Returns:

Type Description
str | list[str] | list[Chunk] | dict[str, Any]

str | list[str] | list[Chunk] | dict[str, Any]: The result of the retrieval process based on return_type: - If return_type is "chunks": Returns list[Chunk] - If return_type is "strings": Returns list[str] - Returns empty list on error

Raises:

Type Description
ValueError

If an invalid return type is provided.

PIIAwareRetriever(data_store, pii_resolver, weights=None, rank_constant=60, min_candidate=1, metadata_entities_field=METADATA_ENTITIES_FIELD)

Bases: BaseVectorRetriever

Privacy-preserving retriever with hybrid search and rank fusion.

This retriever handles PII in queries by: 1. Anonymizing queries before search operations 2. Executing hybrid search combining entity-filtered vector search with semantic search 3. Applying weighted Reciprocal Rank Fusion (RRF) to combine results 4. De-anonymizing retrieved chunks before returning to callers

Examples:

retriever = PIIAwareRetriever(
    data_store=data_store,
    pii_resolver=MetadataPIIResolver(),
    weights=[0.3, 0.7],
    rank_constant=60,
    min_candidate=1
)
results = await retriever.retrieve("What did Alice say?", top_k=10)

Parameters:

Name Type Description Default
data_store BaseVectorDataStore

Data store for vector search operations.

required
pii_resolver BasePIIResolver

PII resolver for anonymization/de-anonymization.

required
weights list[float] | None

Weights for [filtered, semantic] results. Defaults to [0.2, 0.8]. Weights must be positive and will be normalized to sum to 1.0.

None
rank_constant int

RRF rank constant (k). Defaults to 60.

60
min_candidate int

Minimum candidates per search method. Defaults to 1.

1
metadata_entities_field str

Field name in metadata containing PII entities. Defaults to "metadata.entities".

METADATA_ENTITIES_FIELD

Initialize the PIIAwareRetriever.

Parameters:

Name Type Description Default
data_store BaseVectorDataStore

Data store for vector search operations.

required
pii_resolver BasePIIResolver

PII resolver for anonymization/de-anonymization.

required
weights list[float] | None

Weights for [filtered, semantic] results. Defaults to [0.2, 0.8]. Must have exactly 2 elements with positive values.

None
rank_constant int

RRF rank constant (k). Must be positive. Defaults to 60.

60
min_candidate int

Minimum candidates per search method. Must be positive. Defaults to 1.

1
metadata_entities_field str

Field name in metadata containing PII entities. Defaults to "metadata.entities".

METADATA_ENTITIES_FIELD

Raises:

Type Description
ValueError

If pii_resolver is not a BasePIIResolver subclass.

ValueError

If weights does not have exactly 2 elements or contains non-positive values.

ValueError

If rank_constant or min_candidate are not positive integers.

weighted_reciprocal_rank(doc_lists)

Perform weighted Reciprocal Rank Fusion on multiple rank lists.

This method implements the Weighted Reciprocal Rank Fusion (RRF) algorithm, which combines multiple ranked document lists into a single ranked list. RRF is particularly effective for combining results from different retrieval strategies (e.g., filtered search and semantic search).

The RRF score for each document is calculated as: score = sum(weight_i / (rank_i + k)) for each list i where rank_i is the document's rank in list i (1-based), and k is the rank constant.

Examples:

filtered_results = [chunk1, chunk2, chunk3]  # Ranked by entity filtering
semantic_results = [chunk2, chunk1, chunk4]  # Ranked by semantic similarity
fused = retriever.weighted_reciprocal_rank([filtered_results, semantic_results])
# Returns chunks ordered by combined RRF scores

Parameters:

Name Type Description Default
doc_lists list[list[Chunk]]

A list of rank lists to fuse. - Must contain exactly 2 lists corresponding to [filtered, semantic] results - Each inner list should contain Chunk objects sorted by relevance - Lists may contain overlapping documents (same document ID in multiple lists) - Empty lists are allowed and will be handled gracefully

required

Returns:

Type Description
list[Chunk]

list[Chunk]: The final aggregated list of unique documents sorted by their weighted RRF scores in descending order. Documents with higher scores appear first.

Raises:

Type Description
ValueError

If the number of rank lists doesn't match the configured weights count.

Note
  1. Documents are deduplicated by their id field
  2. The rank_constant parameter controls the influence of rank position
  3. Higher rank_constant values reduce the impact of rank differences

ParentDocumentRetriever(child_data_store, parent_data_store, parent_metadata_field=DEFAULT_PARENT_METADATA_FIELD, blank_id_str_set=None, filter_key_id=DEFAULT_FILTER_KEY_ID, parent_top_k=None)

Bases: VectorRetriever

A retriever that retrieves parent chunks based on child chunk similarity search.

The ParentDocumentRetriever queries a child data store using vector similarity search, then uses the metadata from child chunks to retrieve corresponding parent chunks from a parent data store. The parent-child relationship is defined through a configurable metadata field.

If a child chunk does not have a valid parent metadata field, the child chunk itself is included in the results. Duplicate parent chunks are deduplicated while preserving the order from the child retrieval.

Important
  1. The top_k parameter passed to retrieve controls the number of child chunks fetched from the vector store, not the number of final parent results.
  2. Because multiple child chunks may map to the same parent (deduplication) or to distinct parents, the result count can be fewer or more than top_k.
  3. To cap the final output size, set parent_top_k during initialization.

Examples:

from gllm_datastore.data_store.elasticsearch.data_store import ElasticsearchDataStore

child_store = ElasticsearchDataStore(...).with_vector(em_invoker=embedding_model)
parent_store = ElasticsearchDataStore(...).with_fulltext()

retriever = ParentDocumentRetriever(
    child_data_store=child_store,
    parent_data_store=parent_store,
)

results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="parent doc 1", ...), Chunk(content="parent doc 2", ...), ...]

Attributes:

Name Type Description
data_store BaseDataStore

The child data store with vector capability (alias for child_data_store).

parent_data_store BaseDataStore

The parent data store with fulltext capability.

parent_metadata_field str

The metadata field linking child chunks to parent chunks.

blank_id_str_set set[str]

Set of strings considered as blank/invalid parent IDs.

filter_key_id str

The filter key used to query parent chunks by ID.

parent_top_k int | None

Maximum number of final parent/result chunks to return.

Initialize the ParentDocumentRetriever with child and parent data stores.

Parameters:

Name Type Description Default
child_data_store BaseDataStore

The data store for child chunk similarity search. Must have vector capability registered.

required
parent_data_store BaseDataStore

The data store for parent chunk retrieval by ID. Must have fulltext capability registered.

required
parent_metadata_field str

The metadata field linking child chunks to parent chunks. Defaults to "parent_chunk".

DEFAULT_PARENT_METADATA_FIELD
blank_id_str_set set[str] | None

Set of strings considered as blank/invalid parent IDs. Defaults to None, which uses {"-", "", " "}.

None
filter_key_id str

The filter key used to query parent chunks by ID. Defaults to "_id".

DEFAULT_FILTER_KEY_ID
parent_top_k int | None

Maximum number of final parent/result chunks to return. If None, all deduplicated results are returned. Defaults to None.

None

Raises:

Type Description
TypeError

If child_data_store or parent_data_store is not an instance of BaseDataStore.

ValueError

If child_data_store does not have vector capability registered or parent_data_store does not have fulltext capability registered.

VectorRetriever(data_store)

Bases: DatastoreChunkRetriever

A vector retriever using BaseDataStore with vector capability.

This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with vector capability for document retrieval via similarity search.

Examples:

from gllm_datastore.core.filters import filter as F

data_store = ElasticsearchDataStore(...).with_vector(em_invoker=embedding_model)
retriever = VectorRetriever(data_store=data_store)

results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]

results = await retriever.retrieve(
    "search query",
    query_filter=F.eq("metadata.category", "AI"),
    top_k=10,
    threshold=0.8
)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]


batch_results = await retriever.retrieve(["query 1", "query 2"], top_k=10)
# batch_results: [
#     [Chunk(content="...", score=0.95), ...],  # results for "query 1"
#     [Chunk(content="...", score=0.89), ...]   # results for "query 2"
# ]

Attributes:

Name Type Description
data_store BaseDataStore

The data store with vector capability used for retrieval operations.

Initialize the VectorRetriever with a data store.

Parameters:

Name Type Description Default
data_store BaseDataStore

The data store to retrieve from. Must have vector capability registered.

required

Raises:

Type Description
TypeError

If data_store is not an instance of BaseDataStore.

ValueError

If data_store does not have vector capability registered.

retrieve(query, query_filter=None, top_k=None, threshold=None, **kwargs) async

retrieve(query: str, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[list[Chunk]]

Retrieve documents based on the query using vector similarity search.

This method performs a retrieval operation using the configured data store's vector capability.

Parameters:

Name Type Description Default
query str | list[str]

The query string or list of query strings to retrieve documents. If a list is provided, retrieval is performed for each query concurrently.

required
query_filter FilterClause | QueryFilter | None

Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None.

None
top_k int | None

The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit.

None
threshold float | None

The minimum score threshold for filtering results. Defaults to None, in which case no filtering is applied.

None
**kwargs Any

Additional parameters passed to the data store's retrieve method.

{}

Returns:

Type Description
list[Chunk] | list[list[Chunk]]

list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by similarity score. Returns list[list[Chunk]] if query is a list of strings.