Skip to content

Graph Data Store

Modules containing graph data store implementations to be used in Gen AI applications.

LightRAGPostgresDataStore(lm_invoker, em_invoker, postgres_db_host='localhost', postgres_db_port=5432, postgres_db_user='postgres', postgres_db_password='password', postgres_db_name='postgres', postgres_db_workspace='default', use_cache=False, lm_invoke_kwargs=None, instance=None, **kwargs)

Bases: BaseLightRAGDataStore

Data store implementation for LightRAG-based graph RAG using PostgreSQL.

This class extends the LightRAGDataStore to use PostgreSQL as the graph database, key-value store, and vector database.

To use this data store, please ensure that you have a PostgreSQL with AGE and PGVector extensions installed. You can use the following docker run command to start a PostgreSQL container with AGE and PGVector extensions:

docker run         -p 5455:5432         -d         --name postgres-LightRag         shangor/postgres-for-rag:v1.0         sh -c "service postgresql start && sleep infinity"
Example
from gllm_inference.em_invoker import OpenAIEMInvoker
from gllm_inference.lm_invoker import OpenAILMInvoker
from gllm_datastore.graph_data_store.light_rag_postgres_data_store import LightRAGPostgresDataStore

# Create the indexer
data_store = await LightRAGPostgresDataStore(
    lm_invoker=OpenAILMInvoker(model_name="gpt-4o-mini"),
    em_invoker=OpenAIEMInvoker(model_name="text-embedding-3-small"),
    postgres_db_user="rag",
    postgres_db_password="rag",
    postgres_db_name="rag",
    postgres_db_host="localhost",
    postgres_db_port=5455,
)

# Retrieve using LightRAG instance
await data_store.query("What is AI?")

Attributes:

Name Type Description
instance LightRAG

The LightRAG instance used for indexing and querying.

lm_invoker_adapter LightRAGLMInvokerAdapter

The adapter for the LM invoker.

em_invoker_adapter LightRAGEMInvokerAdapter

The adapter for the EM invoker.

postgres_config PostgresDBConfig

Pydantic model containing PostgreSQL configuration parameters.

Initialize the LightRAGPostgresIndexer.

Parameters:

Name Type Description Default
lm_invoker BaseLMInvoker

The LM invoker to use.

required
em_invoker BaseEMInvoker

The EM invoker to use.

required
postgres_db_host str

The host for the PostgreSQL database. Defaults to "localhost".

'localhost'
postgres_db_port int

The port for the PostgreSQL database. Defaults to 5432.

5432
postgres_db_user str

The user for the PostgreSQL database. Defaults to "postgres".

'postgres'
postgres_db_password str

The password for the PostgreSQL database. Defaults to "password".

'password'
postgres_db_name str

The name for the PostgreSQL database. Defaults to "postgres".

'postgres'
postgres_db_workspace str

The workspace for the PostgreSQL database. Defaults to "default".

'default'
use_cache bool

Whether to enable caching for the LightRAG instance. Defaults to False.

False
lm_invoke_kwargs dict[str, Any] | None

Keyword arguments for the LM invoker. Defaults to None.

None
instance LightRAG | None

A configured LightRAG instance to use. Defaults to None.

None
**kwargs Any

Additional keyword arguments.

{}

LlamaIndexNeo4jGraphRAGDataStore(*args, **kwargs)

Bases: LlamaIndexGraphRAGDataStore, Neo4jPropertyGraphStore

Graph RAG data store for Neo4j.

This class extends the Neo4jPropertyGraphStore class from LlamaIndex. This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.

Attributes:

Name Type Description
neo4j_version_tuple tuple[int, ...]

The Neo4j version tuple.

Example
store = LlamaIndexNeo4jGraphRAGDataStore(
    url="bolt://localhost:7687",
    username="neo4j",
    password="password"
)
# Perform RAG query
results = await store.query("What is the relationship between X and Y?")

# Delete document data
await store.delete_by_document_id("doc123")

Initialize the LlamaIndexNeo4jGraphRAGDataStore.

Parameters:

Name Type Description Default
*args

Variable length argument list.

()
**kwargs

Arbitrary keyword arguments.

{}

delete_by_document_id(document_id, **kwargs) async

Delete nodes and edges by document ID.

Parameters:

Name Type Description Default
document_id str

The document ID.

required
**kwargs Any

Additional keyword arguments.

{}

NebulaGraphDataStore(url, port, user, password, space, operation_wait_time=5)

Bases: BaseGraphDataStore

Implementation of BaseGraphDataStore for Nebula Graph.

This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Nebula graph databases.

Attributes:

Name Type Description
connection_pool ConnectionPool

The connection pool for Nebula Graph.

space str

The space name.

user str

The username.

password str

The password.

operation_wait_time int

The timeout in seconds.

Example
store = NebulaGraphDataStore(
    url="127.0.0.1",
    port=9669,
    user="root",
    password="nebula",
    space="testing"
)
# Perform query
results = await store.query("MATCH (n) RETURN n")

# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})

Initialize NebulaGraphDataStore.

Parameters:

Name Type Description Default
url str

The URL of the graph store.

required
port int

The port of the graph store.

required
user str

The user of the graph store.

required
password str

The password of the graph store.

required
space str

The space name.

required
operation_wait_time int

The operation wait time in seconds. Defaults to 5.

5

close() async

Close the graph data store.

delete_node(label, identifier_key, identifier_value) async

Delete a node from the graph.

Parameters:

Name Type Description Default
label str

The label of the node.

required
identifier_key str

The key of the identifier.

required
identifier_value str

The identifier of the node.

required

Returns:

Name Type Description
Any Any

The result of the operation.

delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value) async

Delete a relationship between two nodes in the graph.

Parameters:

Name Type Description Default
node_source_key str

The key of the source node.

required
node_source_value str

The identifier of the source node.

required
relation str

The type of the relationship.

required
node_target_key str

The key of the target node.

required
node_target_value str

The identifier of the target node.

required

Returns:

Name Type Description
Any Any

The result of the operation.

get_nodes(label=None) async

Get all nodes with optional label filter.

Parameters:

Name Type Description Default
label str | None

The label of the nodes. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The result of the query.

get_relationships(source_value=None, relation=None) async

Get relationships with optional filters.

Parameters:

Name Type Description Default
source_value str | None

The source vertex identifier. Defaults to None.

None
relation str | None

The relationship type. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The result of the query.

query(query, parameters=None) async

Query the graph store.

Parameters:

Name Type Description Default
query str

The query to be executed.

required
parameters dict[str, Any] | None

The parameters of the query. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The result of the query.

traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3) async

Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.

Example
nodes, relationships = await graph_data_store.traverse_graph(
    node_properties={"name": "John Doe"},
    extracted_node_properties=["name", "age"],
    extracted_relationship_properties=["since"],
    depth=1
)

Means starting from the node with property name equal to "John Doe", traverse the graph up to depth 1, extracting the name and age properties from nodes and the since property from relationships.

nodes, relationships = await graph_data_store.traverse_graph(
    node_properties={"name": "John Doe"},
    depth=2
)

Means starting from the node with property name equal to "John Doe", traverse the graph up to depth 2, extracting all properties from nodes and relationships.

Parameters:

Name Type Description Default
node_properties dict[str, Any]

The properties of the starting node.

required
extracted_node_properties list[str] | None

The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None.

None
extracted_relationship_properties list[str] | None

The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None.

None
depth int

The depth of traversal. Defaults to 3.

3

Returns:

Type Description
list[dict[str, Any]]

tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties.

list[dict[str, Any]]

Example return value:

tuple[list[dict[str, Any]], list[dict[str, Any]]]

nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } }

tuple[list[dict[str, Any]], list[dict[str, Any]]]

]

tuple[list[dict[str, Any]], list[dict[str, Any]]]

relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } }

tuple[list[dict[str, Any]], list[dict[str, Any]]]

]

Raises:

Type Description
ValueError

If node_properties is empty or depth is less than 1.

upsert_node(label, identifier_key, identifier_value, properties=None) async

Upsert a node in the graph.

Parameters:

Name Type Description Default
label str

The label of the node.

required
identifier_key str

The key of the identifier.

required
identifier_value str

The value of the identifier.

required
properties dict[str, Any] | None

The properties of the node. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the operation.

upsert_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value, properties=None) async

Upsert a relationship between two nodes in the graph.

Parameters:

Name Type Description Default
node_source_key str

The key of the source node.

required
node_source_value str

The value of the source node.

required
relation str

The type of the relationship.

required
node_target_key str

The key of the target node.

required
node_target_value str

The value of the target node.

required
properties dict[str, Any] | None

The properties of the relationship. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the operation.

Neo4jGraphDataStore(uri, user, password, max_connection_pool_size=100, retry_config=None, **kwargs)

Bases: BaseGraphDataStore

Implementation of BaseGraphDataStore for Neo4j.

This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.

Attributes:

Name Type Description
driver Driver

The Neo4j driver.

Example
store = Neo4jGraphDataStore(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password"
)
# Perform async operations
results = await store.query("MATCH (n) RETURN n")

# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})

Initialize Neo4jGraphDataStore.

Parameters:

Name Type Description Default
uri str

The URI of the graph store.

required
user str

The user of the graph store.

required
password str

The password of the graph store.

required
max_connection_pool_size int

The maximum size of the connection pool. Defaults to 100.

100
retry_config RetryConfig | None

Configuration for retry behavior. Defaults to None. If provided, query operations will be retried according to the specified RetryConfig parameters. When a database operation fails with a retryable exception (e.g., neo4j.exceptions.ServiceUnavailable), the operation will be automatically retried based on the retry policy defined in the configuration.

None
**kwargs Any

Additional keyword arguments for the driver.

{}

close() async

Close the graph data store.

delete_node(label, identifier_key, identifier_value) async

Delete a node from the graph.

Parameters:

Name Type Description Default
label str

The label of the node.

required
identifier_key str

The key of the identifier.

required
identifier_value str

The identifier of the node.

required

Returns:

Name Type Description
Any Any

The result of the operation.

delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value) async

Delete a relationship between two nodes in the graph.

Parameters:

Name Type Description Default
node_source_key str

The key of the source node.

required
node_source_value str

The identifier of the source node.

required
relation str

The type of the relationship.

required
node_target_key str

The key of the target node.

required
node_target_value str

The identifier of the target node.

required

Returns:

Name Type Description
Any Any

The result of the operation.

query(query, parameters=None) async

Query the graph store.

Parameters:

Name Type Description Default
query str

The query to be executed.

required
parameters dict[str, Any] | None

The parameters of the query. Defaults to None.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The result of the query.

traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3) async

Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.

Example
nodes, relationships = await graph_data_store.traverse_graph(
    node_properties={"name": "John Doe"},
    extracted_node_properties=["name", "age"],
    extracted_relationship_properties=["since"],
    depth=1
)

Means starting from the node with property name equal to "John Doe", traverse the graph up to depth 1, extracting the name and age properties from nodes and the since property from relationships.

nodes, relationships = await graph_data_store.traverse_graph(
    node_properties={"name": "John Doe"},
    depth=2
)

Means starting from the node with property name equal to "John Doe", traverse the graph up to depth 2, extracting all properties from nodes and relationships.

Parameters:

Name Type Description Default
node_properties dict[str, Any]

The properties of the starting node.

required
extracted_node_properties list[str] | None

The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None.

None
extracted_relationship_properties list[str] | None

The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None.

None
depth int

The depth of traversal. Defaults to 3.

3

Returns:

Type Description
list[dict[str, Any]]

tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties.

list[dict[str, Any]]

Example return value:

tuple[list[dict[str, Any]], list[dict[str, Any]]]

nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } }

tuple[list[dict[str, Any]], list[dict[str, Any]]]

]

tuple[list[dict[str, Any]], list[dict[str, Any]]]

relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } }

tuple[list[dict[str, Any]], list[dict[str, Any]]]

]

Raises:

Type Description
ValueError

If node_properties is empty or depth is less than 1.

upsert_node(label, identifier_key, identifier_value, properties=None) async

Upsert a node in the graph.

Parameters:

Name Type Description Default
label str

The label of the node.

required
identifier_key str

The key of the identifier.

required
identifier_value str

The value of the identifier.

required
properties dict[str, Any] | None

The properties of the node. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the operation.

upsert_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value, properties=None) async

Upsert a relationship between two nodes in the graph.

Parameters:

Name Type Description Default
node_source_key str

The key of the source node.

required
node_source_value str

The value of the source node.

required
relation str

The type of the relationship.

required
node_target_key str

The key of the target node.

required
node_target_value str

The value of the target node.

required
properties dict[str, Any] | None

The properties of the relationship. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the operation.