Graph Data Store
Modules containing graph data store implementations to be used in Gen AI applications.
LightRAGPostgresDataStore(lm_invoker, em_invoker, postgres_db_host='localhost', postgres_db_port=5432, postgres_db_user='postgres', postgres_db_password='password', postgres_db_name='postgres', postgres_db_workspace='default', use_cache=False, lm_invoke_kwargs=None, instance=None, **kwargs)
Bases: BaseLightRAGDataStore
Data store implementation for LightRAG-based graph RAG using PostgreSQL.
This class extends the LightRAGDataStore to use PostgreSQL as the graph database, key-value store, and vector database.
To use this data store, please ensure that you have a PostgreSQL with AGE and PGVector extensions installed. You can use the following docker run command to start a PostgreSQL container with AGE and PGVector extensions:
docker run -p 5455:5432 -d --name postgres-LightRag shangor/postgres-for-rag:v1.0 sh -c "service postgresql start && sleep infinity"
Example
from gllm_inference.em_invoker import OpenAIEMInvoker
from gllm_inference.lm_invoker import OpenAILMInvoker
from gllm_datastore.graph_data_store.light_rag_postgres_data_store import LightRAGPostgresDataStore
# Create the indexer
data_store = await LightRAGPostgresDataStore(
lm_invoker=OpenAILMInvoker(model_name="gpt-4o-mini"),
em_invoker=OpenAIEMInvoker(model_name="text-embedding-3-small"),
postgres_db_user="rag",
postgres_db_password="rag",
postgres_db_name="rag",
postgres_db_host="localhost",
postgres_db_port=5455,
)
# Retrieve using LightRAG instance
await data_store.query("What is AI?")
Attributes:
| Name | Type | Description |
|---|---|---|
instance |
LightRAG
|
The LightRAG instance used for indexing and querying. |
lm_invoker_adapter |
LightRAGLMInvokerAdapter
|
The adapter for the LM invoker. |
em_invoker_adapter |
LightRAGEMInvokerAdapter
|
The adapter for the EM invoker. |
postgres_config |
PostgresDBConfig
|
Pydantic model containing PostgreSQL configuration parameters. |
Initialize the LightRAGPostgresIndexer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lm_invoker |
BaseLMInvoker
|
The LM invoker to use. |
required |
em_invoker |
BaseEMInvoker
|
The EM invoker to use. |
required |
postgres_db_host |
str
|
The host for the PostgreSQL database. Defaults to "localhost". |
'localhost'
|
postgres_db_port |
int
|
The port for the PostgreSQL database. Defaults to 5432. |
5432
|
postgres_db_user |
str
|
The user for the PostgreSQL database. Defaults to "postgres". |
'postgres'
|
postgres_db_password |
str
|
The password for the PostgreSQL database. Defaults to "password". |
'password'
|
postgres_db_name |
str
|
The name for the PostgreSQL database. Defaults to "postgres". |
'postgres'
|
postgres_db_workspace |
str
|
The workspace for the PostgreSQL database. Defaults to "default". |
'default'
|
use_cache |
bool
|
Whether to enable caching for the LightRAG instance. Defaults to False. |
False
|
lm_invoke_kwargs |
dict[str, Any] | None
|
Keyword arguments for the LM invoker. Defaults to None. |
None
|
instance |
LightRAG | None
|
A configured LightRAG instance to use. Defaults to None. |
None
|
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
LlamaIndexNeo4jGraphRAGDataStore(*args, **kwargs)
Bases: LlamaIndexGraphRAGDataStore, Neo4jPropertyGraphStore
Graph RAG data store for Neo4j.
This class extends the Neo4jPropertyGraphStore class from LlamaIndex. This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
neo4j_version_tuple |
tuple[int, ...]
|
The Neo4j version tuple. |
Example
store = LlamaIndexNeo4jGraphRAGDataStore(
url="bolt://localhost:7687",
username="neo4j",
password="password"
)
# Perform RAG query
results = await store.query("What is the relationship between X and Y?")
# Delete document data
await store.delete_by_document_id("doc123")
Initialize the LlamaIndexNeo4jGraphRAGDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args |
Variable length argument list. |
()
|
|
**kwargs |
Arbitrary keyword arguments. |
{}
|
delete_by_document_id(document_id, **kwargs)
async
Delete nodes and edges by document ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id |
str
|
The document ID. |
required |
**kwargs |
Any
|
Additional keyword arguments. |
{}
|
NebulaGraphDataStore(url, port, user, password, space, operation_wait_time=5)
Bases: BaseGraphDataStore
Implementation of BaseGraphDataStore for Nebula Graph.
This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Nebula graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
connection_pool |
ConnectionPool
|
The connection pool for Nebula Graph. |
space |
str
|
The space name. |
user |
str
|
The username. |
password |
str
|
The password. |
operation_wait_time |
int
|
The timeout in seconds. |
Example
store = NebulaGraphDataStore(
url="127.0.0.1",
port=9669,
user="root",
password="nebula",
space="testing"
)
# Perform query
results = await store.query("MATCH (n) RETURN n")
# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})
Initialize NebulaGraphDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
str
|
The URL of the graph store. |
required |
port |
int
|
The port of the graph store. |
required |
user |
str
|
The user of the graph store. |
required |
password |
str
|
The password of the graph store. |
required |
space |
str
|
The space name. |
required |
operation_wait_time |
int
|
The operation wait time in seconds. Defaults to 5. |
5
|
close()
async
Close the graph data store.
delete_node(label, identifier_key, identifier_value)
async
Delete a node from the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label |
str
|
The label of the node. |
required |
identifier_key |
str
|
The key of the identifier. |
required |
identifier_value |
str
|
The identifier of the node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value)
async
Delete a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key |
str
|
The key of the source node. |
required |
node_source_value |
str
|
The identifier of the source node. |
required |
relation |
str
|
The type of the relationship. |
required |
node_target_key |
str
|
The key of the target node. |
required |
node_target_value |
str
|
The identifier of the target node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
get_nodes(label=None)
async
Get all nodes with optional label filter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label |
str | None
|
The label of the nodes. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
get_relationships(source_value=None, relation=None)
async
Get relationships with optional filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_value |
str | None
|
The source vertex identifier. Defaults to None. |
None
|
relation |
str | None
|
The relationship type. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
query(query, parameters=None)
async
Query the graph store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The query to be executed. |
required |
parameters |
dict[str, Any] | None
|
The parameters of the query. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3)
async
Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.
Example
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
extracted_node_properties=["name", "age"],
extracted_relationship_properties=["since"],
depth=1
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 1, extracting the name and age properties from nodes
and the since property from relationships.
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
depth=2
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 2, extracting all properties from nodes and relationships.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_properties |
dict[str, Any]
|
The properties of the starting node. |
required |
extracted_node_properties |
list[str] | None
|
The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None. |
None
|
extracted_relationship_properties |
list[str] | None
|
The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None. |
None
|
depth |
int
|
The depth of traversal. Defaults to 3. |
3
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties. |
list[dict[str, Any]]
|
Example return value: |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_properties is empty or depth is less than 1. |
upsert_node(label, identifier_key, identifier_value, properties=None)
async
Upsert a node in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label |
str
|
The label of the node. |
required |
identifier_key |
str
|
The key of the identifier. |
required |
identifier_value |
str
|
The value of the identifier. |
required |
properties |
dict[str, Any] | None
|
The properties of the node. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
upsert_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value, properties=None)
async
Upsert a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key |
str
|
The key of the source node. |
required |
node_source_value |
str
|
The value of the source node. |
required |
relation |
str
|
The type of the relationship. |
required |
node_target_key |
str
|
The key of the target node. |
required |
node_target_value |
str
|
The value of the target node. |
required |
properties |
dict[str, Any] | None
|
The properties of the relationship. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
Neo4jGraphDataStore(uri, user, password, max_connection_pool_size=100, retry_config=None, **kwargs)
Bases: BaseGraphDataStore
Implementation of BaseGraphDataStore for Neo4j.
This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
driver |
Driver
|
The Neo4j driver. |
Example
store = Neo4jGraphDataStore(
uri="bolt://localhost:7687",
user="neo4j",
password="password"
)
# Perform async operations
results = await store.query("MATCH (n) RETURN n")
# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})
Initialize Neo4jGraphDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri |
str
|
The URI of the graph store. |
required |
user |
str
|
The user of the graph store. |
required |
password |
str
|
The password of the graph store. |
required |
max_connection_pool_size |
int
|
The maximum size of the connection pool. Defaults to 100. |
100
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior. Defaults to None. If provided, query operations will be retried according to the specified RetryConfig parameters. When a database operation fails with a retryable exception (e.g., neo4j.exceptions.ServiceUnavailable), the operation will be automatically retried based on the retry policy defined in the configuration. |
None
|
**kwargs |
Any
|
Additional keyword arguments for the driver. |
{}
|
close()
async
Close the graph data store.
delete_node(label, identifier_key, identifier_value)
async
Delete a node from the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label |
str
|
The label of the node. |
required |
identifier_key |
str
|
The key of the identifier. |
required |
identifier_value |
str
|
The identifier of the node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value)
async
Delete a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key |
str
|
The key of the source node. |
required |
node_source_value |
str
|
The identifier of the source node. |
required |
relation |
str
|
The type of the relationship. |
required |
node_target_key |
str
|
The key of the target node. |
required |
node_target_value |
str
|
The identifier of the target node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
query(query, parameters=None)
async
Query the graph store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query |
str
|
The query to be executed. |
required |
parameters |
dict[str, Any] | None
|
The parameters of the query. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3)
async
Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.
Example
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
extracted_node_properties=["name", "age"],
extracted_relationship_properties=["since"],
depth=1
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 1, extracting the name and age properties from nodes
and the since property from relationships.
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
depth=2
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 2, extracting all properties from nodes and relationships.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_properties |
dict[str, Any]
|
The properties of the starting node. |
required |
extracted_node_properties |
list[str] | None
|
The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None. |
None
|
extracted_relationship_properties |
list[str] | None
|
The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None. |
None
|
depth |
int
|
The depth of traversal. Defaults to 3. |
3
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties. |
list[dict[str, Any]]
|
Example return value: |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_properties is empty or depth is less than 1. |
upsert_node(label, identifier_key, identifier_value, properties=None)
async
Upsert a node in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label |
str
|
The label of the node. |
required |
identifier_key |
str
|
The key of the identifier. |
required |
identifier_value |
str
|
The value of the identifier. |
required |
properties |
dict[str, Any] | None
|
The properties of the node. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |
upsert_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value, properties=None)
async
Upsert a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key |
str
|
The key of the source node. |
required |
node_source_value |
str
|
The value of the source node. |
required |
relation |
str
|
The type of the relationship. |
required |
node_target_key |
str
|
The key of the target node. |
required |
node_target_value |
str
|
The value of the target node. |
required |
properties |
dict[str, Any] | None
|
The properties of the relationship. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The result of the operation. |