Retriever
Module for retriever classes.
BM25Retriever(data_store)
Bases: BaseVectorRetriever
Retrieves documents from the Elasticsearch data store using BM25.
Examples:
retriever = BM25Retriever(data_store=elasticsearch_data_store)
results = await retriever.retrieve("search query", top_k=10)
Initialize the BM25Retriever.
.. deprecated:: 0.5.20
This class is deprecated and will be removed in version 0.6.0.
Use :class:gllm_retrieval.retriever.FulltextRetriever instead, which provides
the same functionality with support for the new BaseDataStore API and batch queries.
Example
from gllm_retrieval.retriever import FulltextRetriever
# Old way (deprecated)
retriever = BM25Retriever(data_store=data_store)
# New way (recommended)
retriever = FulltextRetriever(data_store=data_store.with_fulltext())
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseVectorDataStore
|
The vector data store with BM25 capabilities. |
required |
BasicSQLRetriever(sql_data_store, lm_request_processor=None, extract_func=None, max_retries=0, preprocess_query_func=None)
Bases: BaseSQLRetriever
Initializes a new instance of the BasicSQLRetriever class.
This class provides a straightforward implementation of the BasicSQLRetriever, using a single data store for document retrieval.
Examples:
from gllm_datastore.sql_data_store.sql_data_store import SQLDataStore
from gllm_retrieval.retriever.sql_retriever.basic_sql_retriever import BasicSQLRetriever
# Create a retriever with an existing data store
retriever = BasicSQLRetriever(sql_data_store)
# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")
Attributes:
| Name | Type | Description |
|---|---|---|
sql_data_store |
BaseSQLDataStore
|
The SQL database data store to be used. |
lm_request_processor |
LMRequestProcessor | None
|
The LMRequestProcessor instance to be used for modifying a failed query. If not None, will modify the query upon failure. |
extract_func |
Callable[[str | list[str] | dict[str, str | list[str]]], str | list[str]]
|
A function to extract the modified query from the LM output. |
max_retries |
int
|
The maximum number of retries for the retrieval process. |
preprocess_query_func |
Callable[[str], str] | None
|
A function to preprocess the query before execution. |
logger |
Logger
|
The logger instance to be used for logging. |
Initializes the BasicSQLRetriever object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql_data_store
|
BaseSQLDataStore
|
The SQL database data store to be used. |
required |
lm_request_processor
|
LMRequestProcessor | None
|
The LMRequestProcessor instance to be used for modifying a failed query. If not None, will modify the query upon failure. |
None
|
extract_func
|
Callable[[str | list[str] | dict[str, str | list[str]]], str | list[str]]
|
A function to extract the transformed query from the output. Defaults to None, in which case a default extractor will be used. |
None
|
max_retries
|
int
|
The maximum number of retries for the retrieval process. Defaults to 0. |
0
|
preprocess_query_func
|
Callable[[str], str] | None
|
A function to preprocess the query before. Defaults to None. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If lm_request_processor is not provided when max_retries is greater than 0. |
retrieve(query, event_emitter=None, prompt_kwargs=None, return_query=False)
async
Retrieve data based on the query.
This method performs a retrieval operation using the configured data store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The query string to retrieve documents. |
required |
event_emitter
|
EventEmitter | None
|
The event emitter to emit events. Defaults to None. |
None
|
prompt_kwargs
|
dict[str, Any] | None
|
Additional keyword arguments for the prompt. Defaults to None. |
None
|
return_query
|
bool
|
If True, returns a tuple of the executed query and the result. If False, returns only the result. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
DataFrame | tuple[str, DataFrame]
|
pd.DataFrame | tuple[str, pd.DataFrame]: The result of the retrieval process. If return_query is True, returns a tuple of the executed query and the result. If return_query is False, returns only the result. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the retrieval process fails after the maximum number of retries. |
BasicVectorRetriever(data_store)
Bases: BaseVectorRetriever
Initializes a new instance of the BasicVectorRetriever class.
.. deprecated:: 0.5.20
This class is deprecated and will be removed in version 0.6.0.
Use :class:gllm_retrieval.retriever.VectorRetriever instead, which provides
the same functionality with support for the new BaseDataStore API and batch queries.
This class provides a straightforward implementation of the BaseRetriever, using a single data store for document retrieval.
Examples:
retriever = BasicVectorRetriever(data_store=elasticsearch_data_store)
results = await retriever.retrieve("search query", top_k=10)
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseVectorDataStore
|
The data store used for retrieval operations. |
Initializes a new instance of the BasicRetriever class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseVectorDataStore
|
The data store to be used for retrieval operations. |
required |
EnsembleRetriever(retrievers, weights=None, rank_constant=60, min_candidate=None)
Bases: BaseRetriever[list[FuseableT]]
Retriever that ensembles the multiple retrievers.
Examples:
ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.7, 0.3]
)
results = await ensemble_retriever.retrieve("search query", top_k=10)
Attributes:
| Name | Type | Description |
|---|---|---|
retrievers |
list[BaseRetriever[list[FuseableT]]]
|
A list of retrievers to ensemble. All retrievers must return the same type that implements the Fuseable protocol. |
weights |
list[float] | None
|
A list of weights corresponding to the retrievers. Defaults to equal weighting for all retrievers. Weights must be strictly positive. If they do not sum to 1.0, they will be normalized internally and a warning will be logged. |
rank_constant |
int
|
A constant added to the rank, controlling the balance between the importance of high-ranked items and the consideration given to lower-ranked items. Default to 60. |
min_candidate |
int | None
|
Minimum number of candidates to fetch per retriever before fusion (per-retriever floor). The effective per-retriever k is computed as: max(min_candidate, ceil(top_k / number_of_retrievers)). Defaults to 1 if not provided. |
Initialize the EnsembleRetriever with a list of retrievers and optional weights.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retrievers
|
list[BaseRetriever[list[FuseableT]]]
|
A list of retrievers to ensemble. All retrievers must return the same type that implements the Fuseable protocol. |
required |
weights
|
list[float] | None
|
A list of weights corresponding to the retrievers. Defaults to equal weighting for all retrievers. Weights must be strictly positive. If they do not sum to 1.0, they will be normalized internally and a warning will be logged. |
None
|
rank_constant
|
int
|
A constant added to the rank, controlling the balance between the importance of high-ranked items and the consideration given to lower-ranked items. Default to 60. |
60
|
min_candidate
|
int | None
|
Minimum number of candidates to fetch per retriever before fusion (per-retriever floor). The effective per-retriever k is computed as: max(min_candidate, ceil(top_k / number_of_retrievers)). Defaults to 1 if not provided. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the list of retrievers is empty or contains less than two retrievers. |
ValueError
|
If the length of weights does not match the number of retrievers. |
ValueError
|
If any weight is non-positive. |
ValueError
|
If the sum of weights is not close to 1.0. |
rank_fusion(query, top_k=DEFAULT_TOP_K, query_filter=None, **kwargs)
async
Asynchronously retrieve the results of the retrievers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str | None
|
The query to search for. |
required |
top_k
|
int
|
The number of results to return. |
DEFAULT_TOP_K
|
query_filter
|
FilterClause | QueryFilter | None
|
Filter criteria for the retrieval. |
None
|
**kwargs
|
Any
|
Additional retrieval parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
list[FuseableT]
|
list[FuseableT]: A list of reranked items. |
retrieve(query, query_filter=None, **kwargs)
async
Retrieve documents using ensemble retrieval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str | list[str]
|
The query string or list of query strings to retrieve documents. |
required |
query_filter
|
FilterClause | QueryFilter | None
|
Filter criteria for the retrieval. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional parameters for the retrieval process. Common parameters include: 1. top_k (int): Maximum number of documents to retrieve. Defaults to DEFAULT_TOP_K. 2. threshold (float): Minimum score threshold for filtering results. Defaults to None. 3. timeout (float): Maximum time in seconds to wait for retrieval. Defaults to None. |
{}
|
Returns:
| Type | Description |
|---|---|
list[FuseableT] | list[list[FuseableT]]
|
list[FuseableT] | list[list[FuseableT]]: Retrieved items. Returns list[FuseableT] for single query, list[list[FuseableT]] for batch queries. |
weighted_reciprocal_rank(doc_lists)
Perform weighted Reciprocal Rank Fusion on multiple rank lists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
doc_lists
|
list[list[FuseableT]]
|
A list of rank lists, where each rank list contains unique items. |
required |
Returns:
| Type | Description |
|---|---|
list[FuseableT]
|
list[FuseableT]: The final aggregated list of items sorted by their weighted RRF scores in descending order. |
FulltextRetriever(data_store)
Bases: DatastoreChunkRetriever
A fulltext retriever using BaseDataStore with fulltext capability.
This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with fulltext capability for document retrieval. The retrieval strategy depends on the datastore implementation and can be customized via kwargs.
Examples:
from gllm_datastore.data_store.elasticsearch.fulltext import SupportedQueryMethods
from gllm_datastore.core.filters import filter as F
data_store = ElasticsearchDataStore(...).with_fulltext(index_name="my_index")
retriever = FulltextRetriever(data_store=data_store)
results = await retriever.retrieve(
query_filter=F.eq("metadata.category", "AI"),
top_k=10
)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]
results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]
results = await retriever.retrieve(
"search query",
top_k=10,
strategy=SupportedQueryMethods.BM25, # or strategy="bm25"
k1=1.5, # BM25 parameter
b=0.75 # BM25 parameter
)
# results: [Chunk(content="...", score=0.92), ...]
batch_results = await retriever.retrieve(
["query 1", "query 2"],
top_k=10,
strategy="bm25"
)
# batch_results: [
# [Chunk(content="...", score=0.92), ...], # results for "query 1"
# [Chunk(content="...", score=0.88), ...] # results for "query 2"
# ]
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseDataStore
|
The data store with fulltext capability used for retrieval operations. |
Initialize the FulltextRetriever with a data store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseDataStore
|
The data store to retrieve from. Must have fulltext capability registered. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If data_store is not an instance of BaseDataStore. |
ValueError
|
If data_store does not have fulltext capability registered. |
retrieve(query=None, query_filter=None, top_k=None, **kwargs)
async
retrieve(query: str | None = None, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, **kwargs: Any) -> list[list[Chunk]]
Retrieve documents using fulltext search.
This method performs a retrieval operation using the configured data store's fulltext capability. The retrieval strategy depends on the datastore implementation and can be customized via kwargs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str | list[str] | None
|
The query string or list of query strings. If a list is provided, retrieval is performed for each query concurrently. If None, retrieval is based on filters only (e.g., BY_FIELD strategy). Defaults to None. |
None
|
query_filter
|
FilterClause | QueryFilter | None
|
Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None. |
None
|
top_k
|
int | None
|
The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit. |
None
|
**kwargs
|
Any
|
Additional parameters passed to the data store's retrieve method.
For Elasticsearch, common parameters include:
1. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Chunk] | list[list[Chunk]]
|
list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by relevance score. Returns list[list[Chunk]] if query is a list of strings. |
HybridRetriever(data_store)
Bases: DatastoreChunkRetriever
A hybrid retriever using BaseDataStore with hybrid capability.
This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with hybrid capability for document retrieval via combined search paradigms (fulltext and vector).
Examples:
from gllm_inference.em_invoker import OpenAIEMInvoker
from gllm_inference.model import OpenAIEM
from gllm_datastore.core.capabilities.hybrid_capability import HybridSearchType, SearchConfig
from gllm_datastore.core.filters import filter as F
from gllm_datastore.data_store.elasticsearch.data_store import ElasticsearchDataStore
em_invoker = OpenAIEMInvoker(OpenAIEM.TEXT_EMBEDDING_3_SMALL)
hybrid_config = [
SearchConfig(search_type=HybridSearchType.FULLTEXT, field="text", weight=0.3),
SearchConfig(search_type=HybridSearchType.VECTOR, field="embedding", weight=0.7, em_invoker=em_invoker),
]
data_store = ElasticsearchDataStore(
index_name="my_index", url="http://localhost:9200"
).with_hybrid(config=hybrid_config)
retriever = HybridRetriever(data_store=data_store)
results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]
results = await retriever.retrieve(
"search query",
query_filter=F.eq("metadata.category", "AI"),
top_k=10,
threshold=0.8
)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]
batch_results = await retriever.retrieve(["query 1", "query 2"], top_k=10)
# batch_results: [
# [Chunk(content="...", score=0.95), ...], # results for "query 1"
# [Chunk(content="...", score=0.89), ...] # results for "query 2"
# ]
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseDataStore
|
The data store with hybrid capability used for retrieval operations. |
Initialize the HybridRetriever with a data store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseDataStore
|
The data store to retrieve from. Must have hybrid capability registered. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If data_store is not an instance of BaseDataStore. |
ValueError
|
If data_store does not have hybrid capability registered. |
retrieve(query, query_filter=None, top_k=None, threshold=None, **kwargs)
async
retrieve(query: str, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[list[Chunk]]
Retrieve documents based on the query using hybrid search.
This method performs a retrieval operation using the configured data store's hybrid capability, combining multiple search paradigms (fulltext and vector).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str | list[str]
|
The query string or list of query strings to retrieve documents. If a list is provided, retrieval is performed for each query concurrently. |
required |
query_filter
|
FilterClause | QueryFilter | None
|
Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None. |
None
|
top_k
|
int | None
|
The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit. |
None
|
threshold
|
float | None
|
The minimum score threshold for filtering results. Defaults to None, in which case no filtering is applied. |
None
|
**kwargs
|
Any
|
Additional parameters passed to the data store's retrieve method. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Chunk] | list[list[Chunk]]
|
list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by relevance score. Returns list[list[Chunk]] if query is a list of strings. |
LightRAGRetriever(data_store)
Bases: BaseGraphRAGRetriever
Retriever implementation for LightRAG-based graph RAG.
This class provides a retriever interface for LightRAG, a graph-based Retrieval Augmented Generation system. It handles the conversion between LightRAG's raw output format and structured data objects, allowing for retrieval of document chunks, knowledge graph entities, and relationships based on natural language queries.
The retriever works with LightRAG data stores to perform graph-aware retrieval operations that leverage both semantic similarity and graph structure to find relevant information.
Examples:
from gllm_datastore.graph_data_store.light_rag_data_store import LightRAGDataStore
from gllm_retrieval.retriever.graph_retriever.light_rag_retriever import LightRAGRetriever
from gllm_retrieval.retriever.graph_retriever.constants import ReturnType
# Create a retriever with an existing data store
retriever = LightRAGRetriever(light_rag_data_store)
# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")
# Retrieve as dictionary with nodes and edges
result_dict = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.DICT
)
# Retrieve as chunk contents (strings)
chunk_contents = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.STRINGS
)
# Retrieve raw result from LightRAG
raw_result = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.STRING
)
# Retrieve as synthesized response
result = await retriever.retrieve(
"What is machine learning?",
only_need_context=False
)
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseLightRAGDataStore
|
The LightRAG data store used for retrieval operations. |
_logger |
Logger
|
Logger instance for this class. |
Initialize the LightRAGRetriever.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseLightRAGDataStore
|
The LightRAG data store to use for retrieval operations. |
required |
retrieve(query, retrieval_params=None, event_emitter=None, return_type=ReturnType.CHUNKS, only_need_context=None)
async
Retrieve information from LightRAG based on a natural language query.
This method queries the LightRAG data store and processes the results according to the specified return type. It can return the raw result, structured chunks, strings, or a dictionary containing nodes and edges from the knowledge graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The natural language query to retrieve information for. |
required |
retrieval_params
|
dict[str, Any] | None
|
Optional dictionary of parameters to pass to the LightRAG query. Defaults to None. |
None
|
event_emitter
|
EventEmitter | None
|
Optional event emitter for tracking retrieval events. Defaults to None. |
None
|
return_type
|
ReturnType
|
The type of result to return (CHUNKS, STRINGS, DICT, or RAW). Defaults to ReturnType.CHUNKS. |
CHUNKS
|
only_need_context
|
bool | None
|
Whether to only return the context (True) or the full result (False). If None, uses the value from retrieval_params or defaults to True. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
str | list[str] | list[Chunk] | dict[str, Any]
|
str | list[str] | list[Chunk] | dict[str, Any]: Depending on the only_need_context and return_type parameters: - only_need_context=False: Synthesized response string using LM invoker. - only_need_context=True: - ReturnType.CHUNKS (default): List of Chunk objects. - ReturnType.STRINGS: List of content strings from chunks. - ReturnType.DICT: Dictionary representation of LightRAGRetrievalResult. - ReturnType.STRING: Raw result string from LightRAG without postprocessing. |
LlamaIndexGraphRAGRetriever(data_store, property_graph_retriever=None, llama_index_llm=None, embed_model=None, vector_store=None, **kwargs)
Bases: BaseGraphRAGRetriever
A retriever class for querying a knowledge graph using the LlamaIndex framework.
Examples:
from gllm_datastore.graph_data_store.llama_index_graph_rag_data_store import LlamaIndexGraphRAGDataStore
from gllm_retrieval.retriever.graph_retriever.llama_index_graph_rag_retriever import LlamaIndexGraphRAGRetriever
from gllm_retrieval.retriever.graph_retriever.constants import ReturnType
# Create a retriever with an existing data store
retriever = LlamaIndexGraphRAGRetriever(llama_index_graph_rag_data_store)
# Retrieve as chunks
chunks = await retriever.retrieve("What is machine learning?")
# Retrieve as dictionary with nodes and edges
result_dict = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.DICT
)
# Retrieve as chunk contents (strings)
chunk_contents = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.STRINGS
)
# Retrieve raw result from LlamaIndexGraphRAG
raw_result = await retriever.retrieve(
"What is machine learning?",
return_type=ReturnType.STRING
)
# Retrieve as synthesized response
result = await retriever.retrieve(
"What is machine learning?",
only_need_context=False
)
Attributes:
| Name | Type | Description |
|---|---|---|
_index |
PropertyGraphIndex
|
The property graph index to use. |
_graph_store |
LlamaIndexGraphRAGDataStore | PropertyGraphStore
|
The graph store to use. |
_llm |
BaseLLM | None
|
The language model to use. |
_embed_model |
BaseEmbedding | None
|
The embedding model to use. |
_property_graph_retriever |
PGRetriever
|
The property graph retriever to use. |
_default_return_type |
ReturnType
|
The default return type for retrieve method. |
Initializes the LlamaIndexGraphRAGRetriever with the provided components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
LlamaIndexGraphRAGDataStore | PropertyGraphStore
|
The graph store to use. |
required |
property_graph_retriever
|
PGRetriever | None
|
An existing retriever to use. |
None
|
llama_index_llm
|
BaseLLM | None
|
The language model to use for text-to-Cypher retrieval. Defaults to None. Deprecated: Use data_store with lm_invoker instead. Instantiate the LLM via LlamaIndexGraphRAGDataStore (e.g., LlamaIndexGraphRAGDataStore(lm_invoker=...)). |
None
|
embed_model
|
BaseEmbedding | None
|
The embedding model to use. Defaults to None. Deprecated: Use data_store with em_invoker instead. Instantiate the embedding model via LlamaIndexGraphRAGDataStore (e.g., LlamaIndexGraphRAGDataStore(em_invoker=...)). |
None
|
vector_store
|
BasePydanticVectorStore | None
|
The vector store to use. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. Supported kwargs: - default_return_type (ReturnType): Default return type for retrieve method. Defaults to "chunks". |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If an invalid return type is provided. |
retrieve(query, retrieval_params=None, event_emitter=None, **kwargs)
async
Retrieves relevant documents for a given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The query string to search for. |
required |
retrieval_params
|
dict[str, Any] | None
|
Additional retrieval parameters. |
None
|
event_emitter
|
EventEmitter | None
|
Event emitter for logging. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. Supported kwargs: - return_type (ReturnType): Type of return value ("chunks" or "strings"). Defaults to value set in constructor. |
{}
|
Returns:
| Type | Description |
|---|---|
str | list[str] | list[Chunk] | dict[str, Any]
|
str | list[str] | list[Chunk] | dict[str, Any]: The result of the retrieval process based on return_type: - If return_type is "chunks": Returns list[Chunk] - If return_type is "strings": Returns list[str] - Returns empty list on error |
Raises:
| Type | Description |
|---|---|
ValueError
|
If an invalid return type is provided. |
PIIAwareRetriever(data_store, pii_resolver, weights=None, rank_constant=60, min_candidate=1, metadata_entities_field=METADATA_ENTITIES_FIELD)
Bases: BaseVectorRetriever
Privacy-preserving retriever with hybrid search and rank fusion.
This retriever handles PII in queries by: 1. Anonymizing queries before search operations 2. Executing hybrid search combining entity-filtered vector search with semantic search 3. Applying weighted Reciprocal Rank Fusion (RRF) to combine results 4. De-anonymizing retrieved chunks before returning to callers
Examples:
retriever = PIIAwareRetriever(
data_store=data_store,
pii_resolver=MetadataPIIResolver(),
weights=[0.3, 0.7],
rank_constant=60,
min_candidate=1
)
results = await retriever.retrieve("What did Alice say?", top_k=10)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseVectorDataStore
|
Data store for vector search operations. |
required |
pii_resolver
|
BasePIIResolver
|
PII resolver for anonymization/de-anonymization. |
required |
weights
|
list[float] | None
|
Weights for [filtered, semantic] results. Defaults to [0.2, 0.8]. Weights must be positive and will be normalized to sum to 1.0. |
None
|
rank_constant
|
int
|
RRF rank constant (k). Defaults to 60. |
60
|
min_candidate
|
int
|
Minimum candidates per search method. Defaults to 1. |
1
|
metadata_entities_field
|
str
|
Field name in metadata containing PII entities. Defaults to "metadata.entities". |
METADATA_ENTITIES_FIELD
|
Initialize the PIIAwareRetriever.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseVectorDataStore
|
Data store for vector search operations. |
required |
pii_resolver
|
BasePIIResolver
|
PII resolver for anonymization/de-anonymization. |
required |
weights
|
list[float] | None
|
Weights for [filtered, semantic] results. Defaults to [0.2, 0.8]. Must have exactly 2 elements with positive values. |
None
|
rank_constant
|
int
|
RRF rank constant (k). Must be positive. Defaults to 60. |
60
|
min_candidate
|
int
|
Minimum candidates per search method. Must be positive. Defaults to 1. |
1
|
metadata_entities_field
|
str
|
Field name in metadata containing PII entities. Defaults to "metadata.entities". |
METADATA_ENTITIES_FIELD
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If pii_resolver is not a BasePIIResolver subclass. |
ValueError
|
If weights does not have exactly 2 elements or contains non-positive values. |
ValueError
|
If rank_constant or min_candidate are not positive integers. |
weighted_reciprocal_rank(doc_lists)
Perform weighted Reciprocal Rank Fusion on multiple rank lists.
This method implements the Weighted Reciprocal Rank Fusion (RRF) algorithm, which combines multiple ranked document lists into a single ranked list. RRF is particularly effective for combining results from different retrieval strategies (e.g., filtered search and semantic search).
The RRF score for each document is calculated as:
score = sum(weight_i / (rank_i + k)) for each list i
where rank_i is the document's rank in list i (1-based), and k is the rank constant.
Examples:
filtered_results = [chunk1, chunk2, chunk3] # Ranked by entity filtering
semantic_results = [chunk2, chunk1, chunk4] # Ranked by semantic similarity
fused = retriever.weighted_reciprocal_rank([filtered_results, semantic_results])
# Returns chunks ordered by combined RRF scores
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
doc_lists
|
list[list[Chunk]]
|
A list of rank lists to fuse.
- Must contain exactly 2 lists corresponding to [filtered, semantic] results
- Each inner list should contain |
required |
Returns:
| Type | Description |
|---|---|
list[Chunk]
|
list[Chunk]: The final aggregated list of unique documents sorted by their weighted RRF scores in descending order. Documents with higher scores appear first. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the number of rank lists doesn't match the configured weights count. |
Note
- Documents are deduplicated by their
idfield - The
rank_constantparameter controls the influence of rank position - Higher
rank_constantvalues reduce the impact of rank differences
ParentDocumentRetriever(child_data_store, parent_data_store, parent_metadata_field=DEFAULT_PARENT_METADATA_FIELD, blank_id_str_set=None, filter_key_id=DEFAULT_FILTER_KEY_ID, parent_top_k=None)
Bases: VectorRetriever
A retriever that retrieves parent chunks based on child chunk similarity search.
The ParentDocumentRetriever queries a child data store using vector similarity search, then uses the metadata from child chunks to retrieve corresponding parent chunks from a parent data store. The parent-child relationship is defined through a configurable metadata field.
If a child chunk does not have a valid parent metadata field, the child chunk itself is included in the results. Duplicate parent chunks are deduplicated while preserving the order from the child retrieval.
Important
- The
top_kparameter passed toretrievecontrols the number of child chunks fetched from the vector store, not the number of final parent results. - Because multiple child chunks may map to the same parent (deduplication) or to distinct parents,
the result count can be fewer or more than
top_k. - To cap the final output size, set
parent_top_kduring initialization.
Examples:
from gllm_datastore.data_store.elasticsearch.data_store import ElasticsearchDataStore
child_store = ElasticsearchDataStore(...).with_vector(em_invoker=embedding_model)
parent_store = ElasticsearchDataStore(...).with_fulltext()
retriever = ParentDocumentRetriever(
child_data_store=child_store,
parent_data_store=parent_store,
)
results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="parent doc 1", ...), Chunk(content="parent doc 2", ...), ...]
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseDataStore
|
The child data store with vector capability (alias for child_data_store). |
parent_data_store |
BaseDataStore
|
The parent data store with fulltext capability. |
parent_metadata_field |
str
|
The metadata field linking child chunks to parent chunks. |
blank_id_str_set |
set[str]
|
Set of strings considered as blank/invalid parent IDs. |
filter_key_id |
str
|
The filter key used to query parent chunks by ID. |
parent_top_k |
int | None
|
Maximum number of final parent/result chunks to return. |
Initialize the ParentDocumentRetriever with child and parent data stores.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
child_data_store
|
BaseDataStore
|
The data store for child chunk similarity search. Must have vector capability registered. |
required |
parent_data_store
|
BaseDataStore
|
The data store for parent chunk retrieval by ID. Must have fulltext capability registered. |
required |
parent_metadata_field
|
str
|
The metadata field linking child chunks to parent chunks. Defaults to "parent_chunk". |
DEFAULT_PARENT_METADATA_FIELD
|
blank_id_str_set
|
set[str] | None
|
Set of strings considered as blank/invalid parent IDs. Defaults to None, which uses {"-", "", " "}. |
None
|
filter_key_id
|
str
|
The filter key used to query parent chunks by ID. Defaults to "_id". |
DEFAULT_FILTER_KEY_ID
|
parent_top_k
|
int | None
|
Maximum number of final parent/result chunks to return. If None, all deduplicated results are returned. Defaults to None. |
None
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If child_data_store or parent_data_store is not an instance of BaseDataStore. |
ValueError
|
If child_data_store does not have vector capability registered or parent_data_store does not have fulltext capability registered. |
VectorRetriever(data_store)
Bases: DatastoreChunkRetriever
A vector retriever using BaseDataStore with vector capability.
This class provides a straightforward implementation of the BaseRetriever, using a BaseDataStore with vector capability for document retrieval via similarity search.
Examples:
from gllm_datastore.core.filters import filter as F
data_store = ElasticsearchDataStore(...).with_vector(em_invoker=embedding_model)
retriever = VectorRetriever(data_store=data_store)
results = await retriever.retrieve("search query", top_k=10)
# results: [Chunk(content="...", score=0.95), Chunk(content="...", score=0.87), ...]
results = await retriever.retrieve(
"search query",
query_filter=F.eq("metadata.category", "AI"),
top_k=10,
threshold=0.8
)
# results: [Chunk(content="...", score=0.92), Chunk(content="...", score=0.85), ...]
batch_results = await retriever.retrieve(["query 1", "query 2"], top_k=10)
# batch_results: [
# [Chunk(content="...", score=0.95), ...], # results for "query 1"
# [Chunk(content="...", score=0.89), ...] # results for "query 2"
# ]
Attributes:
| Name | Type | Description |
|---|---|---|
data_store |
BaseDataStore
|
The data store with vector capability used for retrieval operations. |
Initialize the VectorRetriever with a data store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_store
|
BaseDataStore
|
The data store to retrieve from. Must have vector capability registered. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If data_store is not an instance of BaseDataStore. |
ValueError
|
If data_store does not have vector capability registered. |
retrieve(query, query_filter=None, top_k=None, threshold=None, **kwargs)
async
retrieve(query: str, query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[Chunk]
retrieve(query: list[str], query_filter: FilterClause | QueryFilter | None = None, top_k: int | None = None, threshold: float | None = None, **kwargs: Any) -> list[list[Chunk]]
Retrieve documents based on the query using vector similarity search.
This method performs a retrieval operation using the configured data store's vector capability.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str | list[str]
|
The query string or list of query strings to retrieve documents. If a list is provided, retrieval is performed for each query concurrently. |
required |
query_filter
|
FilterClause | QueryFilter | None
|
Filter criteria for the retrieval. Can be a single FilterClause or a composite QueryFilter. Defaults to None. |
None
|
top_k
|
int | None
|
The maximum number of documents to retrieve. Defaults to None, which uses the data store's default limit. |
None
|
threshold
|
float | None
|
The minimum score threshold for filtering results. Defaults to None, in which case no filtering is applied. |
None
|
**kwargs
|
Any
|
Additional parameters passed to the data store's retrieve method. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Chunk] | list[list[Chunk]]
|
list[Chunk] | list[list[Chunk]]: A list of retrieved documents sorted by similarity score. Returns list[list[Chunk]] if query is a list of strings. |