Graph Data Store
Modules containing graph data store implementations to be used in Gen AI applications.
FalkorDBGraphDataStore(host, port, graph_name, password=None, decode_responses=True, max_connections=16, **kwargs)
Bases: BaseGraphDataStore
Implementation of BaseGraphDataStore for FalkorDB.
Notes
FalkorDB provides an asyncio client (falkordb.asyncio.FalkorDB) that supports awaiting
graph queries using an async Redis connection pool.
Attributes:
| Name | Type | Description |
|---|---|---|
_pool |
BlockingConnectionPool
|
Async Redis connection pool used by the client. |
_db |
FalkorDB
|
The FalkorDB async client instance. |
_graph |
Any
|
The selected graph handle returned by the FalkorDB client. |
graph_name |
str
|
The graph name used by this data store. |
Example
store = FalkorDBGraphDataStore(
host="localhost",
port=6379,
graph_name="my_graph",
)
await store.upsert_node(
label="Person",
identifier_key="name",
identifier_value="John",
properties={"age": 30},
)
results = await store.query("MATCH (n:Person) RETURN n LIMIT 5")
await store.close()
Initialize FalkorDBGraphDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
FalkorDB host. |
required |
port
|
int
|
FalkorDB port. |
required |
graph_name
|
str
|
Graph name. |
required |
password
|
str | None
|
FalkorDB password. Defaults to None. |
None
|
decode_responses
|
bool
|
Decode responses to string. Defaults to True. |
True
|
max_connections
|
int
|
Max async Redis connections. Defaults to 16. |
16
|
**kwargs
|
Any
|
Extra kwargs forwarded to FalkorDB client (if supported). |
{}
|
close()
async
Close resources held by the data store.
delete_node(label, identifier_key, identifier_value)
async
Delete a node from the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
Node label. |
required |
identifier_key
|
str
|
Unique identifier property key. |
required |
identifier_value
|
str
|
Unique identifier property value. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value)
async
Delete a relationship between two nodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str
|
Source node identifier key. |
required |
node_source_value
|
str
|
Source node identifier value. |
required |
relation
|
str
|
Relationship type. |
required |
node_target_key
|
str
|
Target node identifier key. |
required |
node_target_value
|
str
|
Target node identifier value. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
query(query, parameters=None)
async
Execute a query against FalkorDB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Cypher query. |
required |
parameters
|
dict[str, Any] | None
|
Query parameters. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: Query results. |
traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3)
async
Traverse graph from a node with specified properties up to a given depth.
This traversal ignores relationship direction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_properties
|
dict[str, Any]
|
Properties of the starting node. |
required |
extracted_node_properties
|
list[str] | None
|
Node properties to extract. If None, returns all. |
None
|
extracted_relationship_properties
|
list[str] | None
|
Relationship properties to extract. If None, returns all. |
None
|
depth
|
int
|
Maximum traversal depth. Defaults to 3. |
3
|
Returns:
| Type | Description |
|---|---|
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
tuple[list[dict[str, Any]], list[dict[str, Any]]]: (nodes, relationships) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_properties is empty, depth < 1, or property keys are invalid. |
upsert_node(label=None, identifier_key=None, identifier_value=None, properties=None, *, node=None)
async
Upsert a node in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
The label of the node.
Deprecated – use |
None
|
identifier_key
|
str | None
|
The key of the identifier.
Deprecated – use |
None
|
identifier_value
|
str | None
|
The value of the identifier.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra node properties.
Deprecated – use |
None
|
node
|
Node | None
|
A |
None
|
Returns:
| Type | Description |
|---|---|
Node | None
|
Node | None: The upserted node, or |
upsert_relationship(node_source_key=None, node_source_value=None, relation=None, node_target_key=None, node_target_value=None, properties=None, *, edge=None)
async
Upsert a relationship between two nodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str | None
|
The key of the source node.
Deprecated – use |
None
|
node_source_value
|
str | None
|
The value of the source node.
Deprecated – use |
None
|
relation
|
str | None
|
The type of the relationship.
Deprecated – use |
None
|
node_target_key
|
str | None
|
The key of the target node.
Deprecated – use |
None
|
node_target_value
|
str | None
|
The value of the target node.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra relationship properties.
Deprecated – use |
None
|
edge
|
Edge | None
|
An |
None
|
Returns:
| Type | Description |
|---|---|
Edge | None
|
Edge | None: The upserted edge, or |
GraphitiGraphRAGDataStore(index_name=None, reranker=GraphitiRerankerType.NONE, **kwargs)
Bases: BaseGraphRAGDataStore, ABC
Abstract base class for Graphiti GraphRAG data stores.
This class extends BaseGraphRAGDataStore to provide Graphiti-specific functionality for temporal knowledge graph operations.
Attributes:
| Name | Type | Description |
|---|---|---|
_graphiti |
Graphiti | None
|
The underlying Graphiti instance. |
index_name |
str | None
|
Standardized namespacing attribute that maps to Graphiti's group_id parameter. |
Initialize the GraphitiGraphRAGDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index_name
|
str | None
|
Standardized namespacing attribute that maps to Graphiti's group_id. Defaults to None. |
None
|
reranker
|
GraphitiRerankerType
|
Type of reranker to use. Options are NONE (no reranking) or LM_BASED (LM-based reranking). Defaults to GraphitiRerankerType.NONE. |
NONE
|
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
delete_by_chunk_id(chunk_id, **kwargs)
async
Delete an episode by chunk ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
str
|
The chunk ID. |
required |
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Graphiti instance is not initialized. |
delete_by_document_id(document_id, **kwargs)
async
Delete all episodes belonging to a document.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
str
|
The document ID. |
required |
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Graphiti instance is not initialized or any episode deletion fails. |
insert(chunks, **kwargs)
async
Insert chunks into the graph as episodes.
Creates episodes from the provided chunks, stores them in Graphiti, and records custom chunk_id and document_id on the Episodic nodes. Uses parallel execution via asyncio.gather for improved performance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunks
|
list[Chunk]
|
List of Chunk objects to insert. Each chunk should have: - id (str): The chunk ID. - document_id (str): The document ID. - text (str): The episode content text. |
required |
**kwargs
|
Any
|
Additional keyword arguments including: - source_description (str): Description of the source (defaults to document_id). - reference_time (datetime): Timestamp for the episode (defaults to now). |
{}
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: Result containing the episodes information with keys: - episodes (list): List of episode objects. - episode_uuids (list): List of episode UUIDs. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Graphiti instance is not initialized. |
ValueError
|
If required fields are missing from chunks. |
query(query, **kwargs)
async
Query the GraphRAG data store.
Delegates to Graphiti's search() method to retrieve edges/facts matching the query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The query to be executed. |
required |
**kwargs
|
Any
|
Additional keyword arguments: - group_ids (list[str] | None): Filter by group IDs (index_name). - Other arguments passed to Graphiti's search method. |
{}
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query as a list of dictionaries. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Graphiti instance is not initialized. |
GraphitiNeo4jGraphRAGDataStore(url, username, password, database=None, lm_invoker=None, em_invoker=None, index_name=None, reranker=GraphitiRerankerType.NONE, **kwargs)
Bases: GraphitiGraphRAGDataStore
Concrete implementation of GraphitiGraphRAGDataStore using Neo4j backend.
This class extends GraphitiGraphRAGDataStore to provide Neo4j-specific functionality for temporal knowledge graph operations.
Attributes:
| Name | Type | Description |
|---|---|---|
_graphiti |
Graphiti | None
|
The underlying Graphiti instance. |
index_name |
str | None
|
Standardized namespacing attribute that maps to Graphiti's group_id parameter. |
Initialize the GraphitiNeo4jGraphRAGDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Neo4j connection URL (e.g., "bolt://localhost:7687"). |
required |
username
|
str
|
Neo4j username. |
required |
password
|
str
|
Neo4j password. |
required |
database
|
str | None
|
Neo4j database name. If None, uses Neo4jDriver's default. Defaults to None. |
None
|
lm_invoker
|
BaseLMInvoker | None
|
GLLM LM invoker instance. Will be wrapped with GraphitiLMInvokerAdapter for Graphiti compatibility. Defaults to None. |
None
|
em_invoker
|
BaseEMInvoker | None
|
GLLM EM invoker instance. Will be wrapped with GraphitiEMInvokerAdapter for Graphiti compatibility. Defaults to None. |
None
|
index_name
|
str | None
|
Standardized namespacing attribute that maps to Graphiti's group_id. Defaults to None. |
None
|
reranker
|
GraphitiRerankerType
|
Type of reranker to use. Options are NONE (no reranking) or LM_BASED (LM-based reranking). Defaults to GraphitiRerankerType.NONE. |
NONE
|
**kwargs
|
Any
|
Additional keyword arguments passed to Graphiti constructor. |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If graphiti-core is not installed. |
NotImplementedError
|
If LM_BASED reranker is specified (not yet implemented). |
create_required_indexes()
async
Create required Neo4j indexes for optimal query performance.
Creates indexes on custom_chunk_id and custom_document_id properties of Episodic nodes to improve lookup performance.
Note
This method should be called once during database setup. Creating the same index multiple times is idempotent (uses IF NOT EXISTS).
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Neo4j index creation fails. |
LightRAGPostgresDataStore(lm_invoker, em_invoker, postgres_db_host='localhost', postgres_db_port=5432, postgres_db_user='postgres', postgres_db_password='password', postgres_db_name='postgres', postgres_db_workspace='default', use_cache=False, lm_invoke_kwargs=None, instance=None, **kwargs)
Bases: BaseLightRAGDataStore
Data store implementation for LightRAG-based graph RAG using PostgreSQL.
This class extends the LightRAGDataStore to use PostgreSQL as the graph database, key-value store, and vector database.
To use this data store, please ensure that you have a PostgreSQL with AGE and PGVector extensions installed. You can use the following docker run command to start a PostgreSQL container with AGE and PGVector extensions:
docker run -p 5455:5432 -d --name postgres-LightRag shangor/postgres-for-rag:v1.0 sh -c "service postgresql start && sleep infinity"
Example
from gllm_inference.em_invoker import OpenAIEMInvoker
from gllm_inference.lm_invoker import OpenAILMInvoker
from gllm_datastore.graph_data_store.light_rag_postgres_data_store import LightRAGPostgresDataStore
# Create the indexer
data_store = await LightRAGPostgresDataStore(
lm_invoker=OpenAILMInvoker(model_name="gpt-4o-mini"),
em_invoker=OpenAIEMInvoker(model_name="text-embedding-3-small"),
postgres_db_user="rag",
postgres_db_password="rag",
postgres_db_name="rag",
postgres_db_host="localhost",
postgres_db_port=5455,
)
# Retrieve using LightRAG instance
await data_store.query("What is AI?")
Attributes:
| Name | Type | Description |
|---|---|---|
instance |
LightRAG
|
The LightRAG instance used for indexing and querying. |
lm_invoker_adapter |
LightRAGLMInvokerAdapter
|
The adapter for the LM invoker. |
em_invoker_adapter |
LightRAGEMInvokerAdapter
|
The adapter for the EM invoker. |
postgres_config |
PostgresDBConfig
|
Pydantic model containing PostgreSQL configuration parameters. |
Initialize the LightRAGPostgresIndexer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lm_invoker
|
BaseLMInvoker
|
The LM invoker to use. |
required |
em_invoker
|
BaseEMInvoker
|
The EM invoker to use. |
required |
postgres_db_host
|
str
|
The host for the PostgreSQL database. Defaults to "localhost". |
'localhost'
|
postgres_db_port
|
int
|
The port for the PostgreSQL database. Defaults to 5432. |
5432
|
postgres_db_user
|
str
|
The user for the PostgreSQL database. Defaults to "postgres". |
'postgres'
|
postgres_db_password
|
str
|
The password for the PostgreSQL database. Defaults to "password". |
'password'
|
postgres_db_name
|
str
|
The name for the PostgreSQL database. Defaults to "postgres". |
'postgres'
|
postgres_db_workspace
|
str
|
The workspace for the PostgreSQL database. Defaults to "default". |
'default'
|
use_cache
|
bool
|
Whether to enable caching for the LightRAG instance. Defaults to False. |
False
|
lm_invoke_kwargs
|
dict[str, Any] | None
|
Keyword arguments for the LM invoker. Defaults to None. |
None
|
instance
|
LightRAG | None
|
A configured LightRAG instance to use. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
LlamaIndexNeo4jGraphRAGDataStore(url, username, password, lm_invoker=None, em_invoker=None, **kwargs)
Bases: LlamaIndexGraphRAGDataStore, Neo4jPropertyGraphStore
Graph RAG data store for Neo4j.
This class extends the Neo4jPropertyGraphStore class from LlamaIndex. This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
neo4j_version_tuple |
tuple[int, ...]
|
The Neo4j version tuple. |
lm_invoker |
BaseLMInvoker | None
|
The GLLM language model invoker (inherited from parent). |
em_invoker |
BaseEMInvoker | None
|
The GLLM embedding model invoker (inherited from parent). |
llm |
LLM | None
|
The LlamaIndex LLM instance (converted from lm_invoker, inherited from parent). |
embed_model |
BaseEmbedding | None
|
The LlamaIndex embedding instance (converted from em_invoker, inherited from parent). |
Example
# Option 1: Use with GLLM invokers (recommended)
from gllm_inference.builder import build_lm_invoker, build_em_invoker
lm_invoker = build_lm_invoker(model_id="openai/gpt-4o-mini")
em_invoker = build_em_invoker(model_id="openai/text-embedding-3-small")
store = LlamaIndexNeo4jGraphRAGDataStore(
url="bolt://localhost:7687",
username="neo4j",
password="password",
lm_invoker=lm_invoker, # Optional: Auto-converted to LlamaIndex LLM
em_invoker=em_invoker, # Optional: Auto-converted to LlamaIndex Embedding
)
# Option 2: Use with LlamaIndex LLM/Embeddings directly
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
store = LlamaIndexNeo4jGraphRAGDataStore(
url="bolt://localhost:7687",
username="neo4j",
password="password",
)
# Perform RAG query
results = await store.query("What is the relationship between X and Y?")
# Delete document data
await store.delete_by_document_id("doc123")
Initialize the LlamaIndexNeo4jGraphRAGDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
The Neo4j database URL (e.g., "bolt://localhost:7687"). |
required |
username
|
str
|
The Neo4j database username. |
required |
password
|
str
|
The Neo4j database password. |
required |
lm_invoker
|
BaseLMInvoker | None
|
GLLM language model invoker. If provided, it will be automatically converted to a LlamaIndex LLM instance by the parent class. Defaults to None. |
None
|
em_invoker
|
BaseEMInvoker | None
|
GLLM embedding model invoker. If provided, it will be automatically converted to a LlamaIndex BaseEmbedding instance by the parent class. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments passed to Neo4jPropertyGraphStore. |
{}
|
delete_by_chunk_id(chunk_id, **kwargs)
async
Perform cascade deletion of chunk and their related entities in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_id
|
str
|
ID of the chunk to be deleted. |
required |
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
delete_by_document_id(document_id, **kwargs)
async
Delete nodes and edges by document ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
document_id
|
str
|
The document ID. |
required |
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
NebulaGraphDataStore(url, port, user, password, space, operation_wait_time=5)
Bases: BaseGraphDataStore
Implementation of BaseGraphDataStore for Nebula Graph.
This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Nebula graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
connection_pool |
ConnectionPool
|
The connection pool for Nebula Graph. |
space |
str
|
The space name. |
user |
str
|
The username. |
password |
str
|
The password. |
operation_wait_time |
int
|
The timeout in seconds. |
Example
store = NebulaGraphDataStore(
url="127.0.0.1",
port=9669,
user="root",
password="nebula",
space="testing"
)
# Perform query
results = await store.query("MATCH (n) RETURN n")
# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})
Initialize NebulaGraphDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
The URL of the graph store. |
required |
port
|
int
|
The port of the graph store. |
required |
user
|
str
|
The user of the graph store. |
required |
password
|
str
|
The password of the graph store. |
required |
space
|
str
|
The space name. |
required |
operation_wait_time
|
int
|
The operation wait time in seconds. Defaults to 5. |
5
|
close()
async
Close the graph data store.
delete_node(label, identifier_key, identifier_value)
async
Delete a node from the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
The label of the node. |
required |
identifier_key
|
str
|
The key of the identifier. |
required |
identifier_value
|
str
|
The identifier of the node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value)
async
Delete a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str
|
The key of the source node. |
required |
node_source_value
|
str
|
The identifier of the source node. |
required |
relation
|
str
|
The type of the relationship. |
required |
node_target_key
|
str
|
The key of the target node. |
required |
node_target_value
|
str
|
The identifier of the target node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
get_nodes(label=None)
async
Get all nodes with optional label filter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
The label of the nodes. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
get_relationships(source_value=None, relation=None)
async
Get relationships with optional filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_value
|
str | None
|
The source vertex identifier. Defaults to None. |
None
|
relation
|
str | None
|
The relationship type. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
query(query, parameters=None)
async
Query the graph store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The query to be executed. |
required |
parameters
|
dict[str, Any] | None
|
The parameters of the query. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3)
async
Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.
Example
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
extracted_node_properties=["name", "age"],
extracted_relationship_properties=["since"],
depth=1
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 1, extracting the name and age properties from nodes
and the since property from relationships.
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
depth=2
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 2, extracting all properties from nodes and relationships.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_properties
|
dict[str, Any]
|
The properties of the starting node. |
required |
extracted_node_properties
|
list[str] | None
|
The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None. |
None
|
extracted_relationship_properties
|
list[str] | None
|
The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None. |
None
|
depth
|
int
|
The depth of traversal. Defaults to 3. |
3
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties. |
list[dict[str, Any]]
|
Example return value: |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_properties is empty or depth is less than 1. |
upsert_node(label=None, identifier_key=None, identifier_value=None, properties=None, *, node=None)
async
Upsert a node in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
The label of the node.
Deprecated – use |
None
|
identifier_key
|
str | None
|
The key of the identifier.
Deprecated – use |
None
|
identifier_value
|
str | None
|
The value of the identifier.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra node properties.
Deprecated – use |
None
|
node
|
Node | None
|
A |
None
|
Returns:
| Type | Description |
|---|---|
Node | None
|
Node | None: The upserted node, or |
upsert_relationship(node_source_key=None, node_source_value=None, relation=None, node_target_key=None, node_target_value=None, properties=None, *, edge=None)
async
Upsert a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str | None
|
The key of the source node.
Deprecated – use |
None
|
node_source_value
|
str | None
|
The value of the source node.
Deprecated – use |
None
|
relation
|
str | None
|
The type of the relationship.
Deprecated – use |
None
|
node_target_key
|
str | None
|
The key of the target node.
Deprecated – use |
None
|
node_target_value
|
str | None
|
The value of the target node.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra relationship properties.
Deprecated – use |
None
|
edge
|
Edge | None
|
An |
None
|
Returns:
| Type | Description |
|---|---|
Edge | None
|
Edge | None: The upserted edge, or |
Neo4jGraphDataStore(uri, user, password, max_connection_pool_size=100, retry_config=None, **kwargs)
Bases: BaseGraphDataStore
Implementation of BaseGraphDataStore for Neo4j.
This class provides an interface for graph-based Retrieval-Augmented Generation (RAG) operations on Neo4j graph databases.
Attributes:
| Name | Type | Description |
|---|---|---|
driver |
Driver
|
The Neo4j driver. |
Example
store = Neo4jGraphDataStore(
uri="bolt://localhost:7687",
user="neo4j",
password="password"
)
# Perform async operations
results = await store.query("MATCH (n) RETURN n")
# Create a node
node = await store.upsert_node("Person", "name", "John", {"age": 30})
Initialize Neo4jGraphDataStore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
The URI of the graph store. |
required |
user
|
str
|
The user of the graph store. |
required |
password
|
str
|
The password of the graph store. |
required |
max_connection_pool_size
|
int
|
The maximum size of the connection pool. Defaults to 100. |
100
|
retry_config
|
RetryConfig | None
|
Configuration for retry behavior. Defaults to None. If provided, query operations will be retried according to the specified RetryConfig parameters. When a database operation fails with a retryable exception (e.g., neo4j.exceptions.ServiceUnavailable), the operation will be automatically retried based on the retry policy defined in the configuration. |
None
|
**kwargs
|
Any
|
Additional keyword arguments for the driver. |
{}
|
close()
async
Close the graph data store.
delete_node(label, identifier_key, identifier_value)
async
Delete a node from the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str
|
The label of the node. |
required |
identifier_key
|
str
|
The key of the identifier. |
required |
identifier_value
|
str
|
The identifier of the node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
delete_relationship(node_source_key, node_source_value, relation, node_target_key, node_target_value)
async
Delete a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str
|
The key of the source node. |
required |
node_source_value
|
str
|
The identifier of the source node. |
required |
relation
|
str
|
The type of the relationship. |
required |
node_target_key
|
str
|
The key of the target node. |
required |
node_target_value
|
str
|
The identifier of the target node. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
|
query(query, parameters=None)
async
Query the graph store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The query to be executed. |
required |
parameters
|
dict[str, Any] | None
|
The parameters of the query. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
list[dict[str, Any]]: The result of the query. |
traverse_graph(node_properties, extracted_node_properties=None, extracted_relationship_properties=None, depth=3)
async
Traverse graph from a node with specified properties, ignoring relationship's direction, up to a given depth.
Example
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
extracted_node_properties=["name", "age"],
extracted_relationship_properties=["since"],
depth=1
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 1, extracting the name and age properties from nodes
and the since property from relationships.
nodes, relationships = await graph_data_store.traverse_graph(
node_properties={"name": "John Doe"},
depth=2
)
Means starting from the node with property name equal to "John Doe", traverse
the graph up to depth 2, extracting all properties from nodes and relationships.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_properties
|
dict[str, Any]
|
The properties of the starting node. |
required |
extracted_node_properties
|
list[str] | None
|
The properties to extract from nodes during traversal. If None or empty list, all node properties will be returned. Defaults to None. |
None
|
extracted_relationship_properties
|
list[str] | None
|
The properties to extract from relationships during traversal. If None or empty list, all relationship properties will be returned. Defaults to None. |
None
|
depth
|
int
|
The depth of traversal. Defaults to 3. |
3
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
tuple[list[dict[str, Any]], list[dict[str, Any]]]: A tuple containing two lists: - List of nodes with their extracted properties (including the source node). - List of relationships with their extracted properties. |
list[dict[str, Any]]
|
Example return value: |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
nodes = [ { "id": 1001, "labels": ["Person"], "properties": { "name": "John Doe", "age": 30, "occupation": "Engineer" } }, { "id": 2001, "labels": ["Company"], "properties": { "name": "TechCorp", "industry": "Technology", "employees": 500 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
relationships = [ { "id": 5002, "type": "FRIEND_OF", "start_node": 1001, "end_node": 1002, "properties": { "since": "2018-05-20", "closeness": 8 } } |
tuple[list[dict[str, Any]], list[dict[str, Any]]]
|
] |
Raises:
| Type | Description |
|---|---|
ValueError
|
If node_properties is empty or depth is less than 1. |
upsert_node(label=None, identifier_key=None, identifier_value=None, properties=None, *, node=None)
async
Upsert a node in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
label
|
str | None
|
The label of the node.
Deprecated – use |
None
|
identifier_key
|
str | None
|
The key of the identifier.
Deprecated – use |
None
|
identifier_value
|
str | None
|
The value of the identifier.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra node properties.
Deprecated – use |
None
|
node
|
Node | None
|
A |
None
|
Returns:
| Type | Description |
|---|---|
Node | None
|
Node | None: The upserted node, or |
upsert_relationship(node_source_key=None, node_source_value=None, relation=None, node_target_key=None, node_target_value=None, properties=None, *, edge=None)
async
Upsert a relationship between two nodes in the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_source_key
|
str | None
|
The key of the source node.
Deprecated – use |
None
|
node_source_value
|
str | None
|
The value of the source node.
Deprecated – use |
None
|
relation
|
str | None
|
The type of the relationship.
Deprecated – use |
None
|
node_target_key
|
str | None
|
The key of the target node.
Deprecated – use |
None
|
node_target_value
|
str | None
|
The value of the target node.
Deprecated – use |
None
|
properties
|
dict[str, Any] | None
|
Extra relationship properties.
Deprecated – use |
None
|
edge
|
Edge | None
|
An |
None
|
Returns:
| Type | Description |
|---|---|
Edge | None
|
Edge | None: The upserted edge, or |