Sql Data Store
Modules containing SQL data store implementations to be used in Gen AI applications.
QueryFilter
Bases: BaseModel
Model for query filters.
Attributes:
Name | Type | Description |
---|---|---|
conditions |
dict[str, Any]
|
The conditions for filtering the query. |
Example
QueryFilter(conditions={"column1": "value1", "column2": "value2"})
QueryOptions
Bases: BaseModel
Model for query options.
Attributes:
Name | Type | Description |
---|---|---|
columns |
Sequence[str] | None
|
The columns to include in the query result. Defaults to None. |
order_by |
str | None
|
The column to order the query result by. Defaults to None. |
order_desc |
bool
|
Whether to order the query result in descending order. Defaults to False. |
limit |
int | None
|
The maximum number of rows to return. Defaults to None. |
Example
QueryOptions(columns=["column1", "column2"], order_by="column1", order_desc=True, limit=10)
SQLAlchemySQLDataStore(engine_or_url, pool_size=10, max_overflow=10, autoflush=True, **kwargs)
Bases: BaseSQLDataStore
Data store for interacting with SQLAlchemy.
This class provides methods to interact with a SQL database using SQLAlchemy.
Attributes:
Name | Type | Description |
---|---|---|
db |
Session
|
The SQLAlchemy session object. |
engine |
Engine
|
The SQLAlchemy engine object. |
logger |
Logger
|
The logger object. |
Initialize SQLAlchemySQLDataStore class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
engine_or_url |
Engine | str
|
SQLAlchemy engine object or database URL. |
required |
pool_size |
int
|
The size of the database connections to be maintained. Defaults to 10. |
10
|
max_overflow |
int
|
The maximum overflow size of the pool. Defaults to 10. This parameter is ignored for SQLite. |
10
|
autoflush |
bool
|
If True, all changes to the database are flushed immediately. Defaults to True. |
True
|
**kwargs |
Any
|
Additional keyword arguments to support the initialization of the SQLAlchemy adapter. |
{}
|
Raises:
Type | Description |
---|---|
ValueError
|
If the database adapter is not initialized. |
create(model)
Inserts data into the database using SQLAlchemy ORM.
This method provides a structured way to insert data using ORM models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
DeclarativeBase | list[DeclarativeBase]
|
An instance or list of instances of SQLAlchemy model to be inserted. |
required |
Example
To insert a row into a table:
data_store.create(MyModel(column1="value1", column2="value2"))
To insert multiple rows:
data_store.create([
MyModel(column1="value1", column2="value2"),
MyModel(column1="value3", column2="value4")
])
Raises:
Type | Description |
---|---|
RuntimeError
|
If the insertion fails. |
RuntimeError
|
If an unexpected error occurs. |
delete(model_class, filters=None, allow_delete_all=False, **kwargs)
Deletes data from the database using SQLAlchemy ORM.
This method provides a structured way to delete data using ORM models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_class |
Type[DeclarativeBase]
|
The SQLAlchemy model class to delete. |
required |
filters |
QueryFilter | None
|
Filters to apply to the query. Defaults to None. |
None
|
allow_delete_all |
bool
|
If True, allows deletion of all records. Defaults to False. |
False
|
**kwargs |
Any
|
Additional keyword arguments to support the delete method. |
{}
|
Example
To delete a row from a table:
data_store.delete(
MyModel,
filters=QueryFilter(conditions={"id": 1})
)
Raises:
Type | Description |
---|---|
ValueError
|
If no filters are provided (to prevent accidental deletion of all records). |
RuntimeError
|
If the delete operation fails. |
RuntimeError
|
If an unexpected error occurs. |
query(query, params=None)
async
Executes raw SQL queries.
Preferred for complex queries, when working with legacy schemas without ORM models, or when using an LLM to generate your SQL queries. Use this method when you need advanced SQL operations not supported by read().
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The query string with optional :param style parameters. |
required |
params |
dict[str, Any] | None
|
Parameters to bind to the query. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
DataFrame
|
pd.DataFrame: The result of the query. |
Note
Using string parameters directly in queries is unsafe and vulnerable to SQL injection. Therefore, please avoid doing as follows as they're unsafe:
name = "O'Connor"
query = f"SELECT * FROM users WHERE last_name = '{name}'"
or
query = "SELECT * FROM users WHERE last_name = '" + name + "'"
Instead, please use parameterized queries with :param style notation as follows:
query = "SELECT * FROM users WHERE last_name = :last_name"
params = {"last_name": "O'Connor"}
Raises:
Type | Description |
---|---|
RuntimeError
|
If the query fails. |
RuntimeError
|
If an unexpected error occurs. |
read(model_class, filters=None, options=None)
Reads data from the database using SQLAlchemy ORM with a structured, type-safe interface.
This method provides a high-level interface for querying data using ORM models. It supports filtering, column selection, ordering, and limiting results through a type-safe interface.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_class |
Type[DeclarativeBase]
|
The SQLAlchemy model class to query. |
required |
filters |
QueryFilter | None
|
Optional query filters containing column-value pairs to filter the results. Defaults to None. |
None
|
options |
QueryOptions | None
|
Optional query configuration including: - columns: Specific columns to select - order_by: Column to sort by - order_desc: Sort order (ascending/descending) - limit: Maximum number of results Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
DataFrame
|
pd.DataFrame: A DataFrame containing the query results. |
Example
data_store.read(
Message,
filters=QueryFilter(conditions={"conversation_id": "123"}),
options=QueryOptions(
columns=["role", "content"],
order_by="created_at",
order_desc=True,
limit=10
)
)
Raises:
Type | Description |
---|---|
RuntimeError
|
If the read operation fails. |
RuntimeError
|
If an unexpected error occurs. |
update(model_class, update_values, filters=None, **kwargs)
Updates data in the database using SQLAlchemy ORM.
This method provides a structured way to update data using ORM models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_class |
Type[DeclarativeBase]
|
The SQLAlchemy model class to update. |
required |
update_values |
dict[str, Any]
|
Values to update. |
required |
filters |
QueryFilter | None
|
Filters to apply to the query. Defaults to None. |
None
|
**kwargs |
Any
|
Additional keyword arguments to support the update method. |
{}
|
Example
To update a row in a table:
data_store.update(
MyModel,
update_values={"column1": "new_value"}
filters=QueryFilter(conditions={"id": 1}),
)
Raises:
Type | Description |
---|---|
RuntimeError
|
If the update operation fails. |
RuntimeError
|
If an unexpected error occurs. |