Skip to content

Capabilities

Capability base classes and models for datastore interface.

This package defines the core capability base classes and related models used by datastores.

BaseFulltextCapability(encryption=None, default_batch_size=None)

Bases: DataStoreCapability

Base class for fulltext capability implementations.

Handles encryption and batching transparently. Subclasses implement internal CRUD methods that operate on plaintext data (or receive already-encrypted data when encryption is enabled).

create(data, batch_size=None, **kwargs) async

Create records with automatic encryption and batching.

Parameters:

Name Type Description Default
data Chunk | list[Chunk]

Single chunk or list of chunks to create.

required
batch_size int | None

Override batch size for this call. Defaults to None.

None
**kwargs Any

Passed to subclass _create.

{}

retrieve_fuzzy(query, max_distance=2, filters=None, options=None, **kwargs) async

Find records that fuzzy match the query within distance threshold, with automatic decryption.

Parameters:

Name Type Description Default
query str

Text to fuzzy match against.

required
max_distance int

Maximum edit distance for matches (e.g. Levenshtein). Defaults to 2.

2
filters FilterClause | QueryFilter | None

Optional metadata filters. Defaults to None.

None
options QueryOptions | None

Query options (limit, etc.). Defaults to None.

None
**kwargs Any

Passed to subclass _retrieve_fuzzy.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: Matched chunks ordered by relevance/distance, decrypted when encryption is enabled.

BaseGraphCapability

Bases: ABC

Base class for graph database operations.

This base class defines the interface for datastores that support graph-based data operations. This includes node and relationship management as well as graph queries.

delete_node(label, identifier_key, identifier_value) abstractmethod async

Delete a node and its relationships.

Parameters:

Name Type Description Default
label str

Node label/type.

required
identifier_key str

Node identifier key.

required
identifier_value str

Node identifier value.

required

Returns:

Name Type Description
Any Any

Deletion result information.

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value) abstractmethod async

Delete a relationship between nodes.

Parameters:

Name Type Description Default
node_source_key str

Source node identifier key.

required
node_source_value str

Source node identifier value.

required
relation str

Relationship type.

required
node_target_key str

Target node identifier key.

required
node_target_value str

Target node identifier value.

required

Returns:

Name Type Description
Any Any

Deletion result information.

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

retrieve(query, parameters=None) abstractmethod async

Retrieve data from the graph with specific query.

Parameters:

Name Type Description Default
query str

Query to retrieve data from the graph.

required
parameters dict[str, Any] | None

Query parameters. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Query results as list of dictionaries.

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

upsert_node(label, identifier_key, identifier_value, properties=None) abstractmethod async

Create or update a node in the graph.

Parameters:

Name Type Description Default
label str

Node label/type.

required
identifier_key str

Key field for node identification.

required
identifier_value str

Value for node identification.

required
properties dict[str, Any] | None

Additional node properties. Defaults to None.

None

Returns:

Name Type Description
Any Any

Created/updated node information.

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

upsert_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value, properties=None) abstractmethod async

Create or update a relationship between nodes.

Parameters:

Name Type Description Default
node_source_key str

Source node identifier key.

required
node_source_value str

Source node identifier value.

required
relation str

Relationship type.

required
node_target_key str

Target node identifier key.

required
node_target_value str

Target node identifier value.

required
properties dict[str, Any] | None

Relationship properties. Defaults to None.

None

Returns:

Name Type Description
Any Any

Created/updated relationship information.

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

BaseHybridCapability(encryption=None, default_batch_size=None)

Bases: DataStoreCapability

Base class for hybrid capability implementations.

create(chunks, batch_size=None, **kwargs) async

Create chunks with automatic encryption and batching.

Parameters:

Name Type Description Default
chunks list[Chunk]

Chunks to create and index.

required
batch_size int | None

Override batch size. Defaults to None.

None
**kwargs Any

Passed to subclass _create.

{}

create_from_vector(chunks, dense_vectors=None, batch_size=None, **kwargs) async

Create from pre-computed vectors with encryption and batching.

Parameters:

Name Type Description Default
chunks list[Chunk]

Chunks to index.

required
dense_vectors dict[str, list[tuple[Chunk, Vector]]] | None

Per-field vectors. Defaults to None.

None
batch_size int | None

Override batch size; controls actual batching. Defaults to None.

None
**kwargs Any

Passed to subclass.

{}

retrieve(query, filters=None, options=None, **kwargs) async

Retrieve using hybrid search with automatic decryption.

Parameters:

Name Type Description Default
query str

Query text to search with.

required
filters FilterClause | QueryFilter | None

Query filters to apply. Defaults to None.

None
options QueryOptions | None

Query options like limit and sorting. Defaults to None.

None
**kwargs Any

Additional arguments passed to _retrieve.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: Decrypted query results.

retrieve_by_vector(query=None, dense_vectors=None, filters=None, options=None, **kwargs) async

Retrieve by pre-computed vectors with automatic decryption.

Parameters:

Name Type Description Default
query str | None

Optional query text. Defaults to None.

None
dense_vectors dict[str, Vector] | None

Field name to query vector. Defaults to None.

None
filters FilterClause | QueryFilter | None

Filters. Defaults to None.

None
options QueryOptions | None

Query options. Defaults to None.

None
**kwargs Any

Passed to _retrieve_by_vector.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: Decrypted chunks.

BaseVectorCapability(em_invoker, encryption=None, default_batch_size=None)

Bases: DataStoreCapability

Base class for vector capability implementations.

Provides default batching/encryption flows for create, create_from_vector, retrieve, retrieve_by_vector, update, delete, and clear.

Initialize the base vector capability.

Parameters:

Name Type Description Default
em_invoker BaseEMInvoker

Embedding model invoker (required).

required
encryption EncryptionCapability | None

Encryption capability. Defaults to None.

None
default_batch_size int | None

Default batch size. Defaults to None.

None

em_invoker property

Return the embedding model invoker.

Returns:

Name Type Description
BaseEMInvoker BaseEMInvoker

The EM invoker instance.

create(data, batch_size=None, **kwargs) async

Create records with automatic encryption and batching.

Parameters:

Name Type Description Default
data Chunk | list[Chunk]

Single chunk or list of chunks.

required
batch_size int | None

Override batch size. Defaults to None.

None
**kwargs Any

Passed to subclass.

{}

create_from_vector(chunk_vectors, batch_size=None, **kwargs) async

Create from pre-computed vectors with encryption and batching.

Parameters:

Name Type Description Default
chunk_vectors list[tuple[Chunk, Vector]]

Chunks and their vectors.

required
batch_size int | None

Override batch size. Defaults to None.

None
**kwargs Any

Passed to subclass.

{}

ensure_index(**kwargs) abstractmethod async

Ensure vector index exists.

Parameters:

Name Type Description Default
**kwargs Any

Datastore-specific parameters.

{}

Raises:

Type Description
NotImplementedError

This method is not implemented in the subclass.

retrieve_by_vector(vector, filters=None, options=None, **kwargs) async

Retrieve by vector with automatic decryption.

Parameters:

Name Type Description Default
vector Vector

Query vector.

required
filters FilterClause | QueryFilter | None

Filters. Defaults to None.

None
options QueryOptions | None

Query options. Defaults to None.

None
**kwargs Any

Passed to _retrieve_by_vector.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: Decrypted chunks.

update(update_values, filters=None, batch_size=None, **kwargs) async

Update records with centralized encryption and content embedding refresh.

Parameters:

Name Type Description Default
update_values dict[str, Any]

Fields to update.

required
filters FilterClause | QueryFilter | None

Filters. Defaults to None.

None
batch_size int | None

Optional batch size override. Defaults to DefaultBatchSize.UPDATE when not configured at request/capability level.

None
**kwargs Any

Passed to backend-specific _update implementation.

{}

DataStoreCapability(encryption=None, default_batch_size=None)

Bases: ABC

Base class for capability implementations that share encryption and batching.

Holds common state (encryption, default_batch_size) and helpers used by BaseFulltextCapability, BaseVectorCapability, and BaseHybridCapability. Subclasses should not inherit from this directly; use the specific base (BaseFulltextCapability, etc.) instead.

Initialize the data store capability.

Parameters:

Name Type Description Default
encryption EncryptionCapability | None

Encryption capability. Defaults to None.

None
default_batch_size int | None

Default batch size. Defaults to None.

None

clear(**kwargs) async

Clear all records. Delegates to subclass _clear.

Parameters:

Name Type Description Default
**kwargs Any

Passed to _clear.

{}

delete(filters=None, options=None, **kwargs) async

Delete records that match the given filter. Delegates to subclass _delete.

Parameters:

Name Type Description Default
filters FilterClause | QueryFilter | None

Deletion criteria; only records matching this filter are removed. Passed to _delete. Defaults to None.

None
options QueryOptions | None

Query options (e.g. ordering/limit for eviction-style deletes). Passed to _delete. Defaults to None.

None
**kwargs Any

Passed to _delete.

{}

Returns:

Name Type Description
Any Any

Backend-specific delete metadata when available. Returns None for no-op deletes or backends that do not expose delete metadata.

retrieve(*args, **kwargs) async

Retrieve records with automatic decryption. Delegates to _retrieve then decrypts.

Parameters:

Name Type Description Default
*args Any

Passed to _retrieve (signature is capability-specific).

()
**kwargs Any

Passed to _retrieve.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: Decrypted chunks.

update(update_values, filters=None, batch_size=None, **kwargs) async

Update records with automatic encryption of update_values.

Parameters:

Name Type Description Default
update_values dict[str, Any]

Fields to update.

required
filters FilterClause | QueryFilter | None

Filters. Defaults to None.

None
batch_size int | None

Optional batch size override. Defaults to DefaultBatchSize.UPDATE when not configured at request/capability level.

None
**kwargs Any

Passed to _update.

{}

update_encryption(encryption)

Update the encryption capability.

Parameters:

Name Type Description Default
encryption EncryptionCapability

New encryption capability.

required

Raises:

Type Description
TypeError

If the provided encryption is not an instance of EncryptionCapability.

EncryptionCapability(encryptor, encrypted_fields)

Unified implementation of encryption capability.

This class provides the shared encryption and decryption logic that is identical across all backend implementations. It handles: - Chunk content and metadata encryption/decryption - Preparation of encrypted chunks with plaintext embeddings - Encryption of update values

Thread Safety

This class is designed to be thread-safe when used with thread-safe encryptors. The encryptor instance passed must be thread-safe for concurrent encryption/decryption operations. Methods in this class do not perform internal synchronization - thread safety is delegated to the underlying encryptor.

Attributes:

Name Type Description
encryptor BaseEncryptor

The encryptor instance to use for encryption/decryption. Must be thread-safe for concurrent operations.

_encrypted_fields set[str]

The set of fields to encrypt.

Initialize the encryption capability.

Parameters:

Name Type Description Default
encryptor BaseEncryptor

The encryptor instance to use for encryption.

required
encrypted_fields set[str]

The set of fields to encrypt. Supports: 1. Content field: "content" 2. Metadata fields using dot notation: "metadata.secret_key", "metadata.secret_value" Example: {"content", "metadata.secret_key", "metadata.secret_value"}

required

encryption_config property

Get the current encryption configuration.

Returns:

Type Description
set[str]

set[str]: Set of encrypted field names.

is_enabled property

Check if encryption is enabled (has configured fields).

Returns:

Name Type Description
bool bool

True if encryption fields are configured, False otherwise.

decrypt_chunks(chunks)

Decrypt chunks if encryption is enabled.

Parameters:

Name Type Description Default
chunks list[Chunk]

List of chunks to decrypt.

required

Returns:

Type Description
list[Chunk]

list[Chunk]: List of decrypted chunks.

decrypt_field(value)

Decrypt a single field value.

Parameters:

Name Type Description Default
value str

The encrypted value to decrypt.

required

Returns:

Name Type Description
str str

Decrypted value.

encrypt_chunks(chunks)

Encrypt chunks if encryption is enabled.

Parameters:

Name Type Description Default
chunks list[Chunk]

List of chunks to encrypt.

required

Returns:

Type Description
list[Chunk]

list[Chunk]: List of encrypted chunks.

encrypt_embedded_chunks(chunks, em_invoker) async

Encrypt chunks and generate embeddings from plaintext before encryption.

Generates embeddings from plaintext content to ensure embeddings represent the original content rather than encrypted ciphertext. This is used when encryption is enabled.

Parameters:

Name Type Description Default
chunks list[Chunk]

List of chunks to encrypt and generate embeddings for.

required
em_invoker BaseEMInvoker

Embedding model invoker to generate embeddings.

required

Returns:

Type Description
list[tuple[Chunk, Vector]]

list[tuple[Chunk, Vector]]: List of tuples containing encrypted chunks and their corresponding vectors generated from plaintext.

encrypt_field(value)

Encrypt a single field value.

Parameters:

Name Type Description Default
value str

The value to encrypt.

required

Returns:

Name Type Description
str str

Encrypted value.

encrypt_update_values(update_values, content_field_name=CHUNK_KEYS.CONTENT)

Encrypt update values if encryption is enabled.

This method encrypts content and metadata values in update_values according to the encryption configuration. It handles type conversion for non-string values before encryption.

Parameters:

Name Type Description Default
update_values dict[str, Any]

Dictionary of values to encrypt. Supports "content" and "metadata" keys.

required
content_field_name str

The field name to use for content in the output. Defaults to CHUNK_KEYS.CONTENT. Useful for datastores like Elasticsearch that use "text" instead of "content".

CONTENT

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Dictionary with encrypted values where applicable. The "content" key is mapped to content_field_name in the output.

Raises:

Type Description
ValueError

If encryption fails for any field.

HybridSearchType

Bases: StrEnum

Types of searches that can be combined in hybrid search.

SearchConfig

Bases: BaseModel

Configuration for a single search component in hybrid search.

Examples:

FULLTEXT search configuration: python config = SearchConfig( search_type=HybridSearchType.FULLTEXT, field="text", weight=0.3 )

VECTOR search configuration: python config = SearchConfig( search_type=HybridSearchType.VECTOR, field="embedding", em_invoker=em_invoker, weight=0.5 )

Attributes:

Name Type Description
search_type HybridSearchType

Type of search (FULLTEXT or VECTOR).

field str

Field name in the index (e.g., "text", "embedding").

weight float

Weight for this search in hybrid search. Defaults to 1.0.

em_invoker BaseEMInvoker | None

Embedding model invoker required for VECTOR type. Defaults to None.

top_k int | None

Per-search top_k limit (optional). Defaults to None.

extra_kwargs dict[str, Any]

Additional search-specific parameters. Defaults to empty dict.

validate_field_not_empty(v) classmethod

Validate that field name is not empty.

Parameters:

Name Type Description Default
v str

Field name value.

required

Returns:

Name Type Description
str str

Validated field name.

Raises:

Type Description
ValueError

If field name is empty.

validate_search_requirements()

Validate configuration based on search type.

Returns:

Name Type Description
SearchConfig 'SearchConfig'

Validated configuration instance.

Raises:

Type Description
ValueError

If required fields are missing for the search type.

validate_top_k(v) classmethod

Validate that top_k is positive if provided.

Parameters:

Name Type Description Default
v int | None

top_k value.

required

Returns:

Type Description
int | None

int | None: Validated top_k value.

Raises:

Type Description
ValueError

If top_k is provided but not positive.