Skip to content

Sql Data Store

Modules containing SQL data store implementations to be used in Gen AI applications.

QueryFilter

Bases: BaseModel

Model for query filters.

Attributes:

Name Type Description
conditions dict[str, Any]

The conditions for filtering the query.

Example

QueryFilter(conditions={"column1": "value1", "column2": "value2"})

QueryOptions

Bases: BaseModel

Model for query options.

Attributes:

Name Type Description
columns Sequence[str] | None

The columns to include in the query result. Defaults to None.

order_by str | None

The column to order the query result by. Defaults to None.

order_desc bool

Whether to order the query result in descending order. Defaults to False.

limit int | None

The maximum number of rows to return. Defaults to None.

Example

QueryOptions(columns=["column1", "column2"], order_by="column1", order_desc=True, limit=10)

SQLAlchemySQLDataStore(engine_or_url, pool_size=10, max_overflow=10, autoflush=True, **kwargs)

Bases: BaseSQLDataStore

Data store for interacting with SQLAlchemy.

This class provides methods to interact with a SQL database using SQLAlchemy.

Attributes:

Name Type Description
db Session

The SQLAlchemy session object.

engine Engine

The SQLAlchemy engine object.

logger Logger

The logger object.

Initialize SQLAlchemySQLDataStore class.

Parameters:

Name Type Description Default
engine_or_url Engine | str

SQLAlchemy engine object or database URL.

required
pool_size int

The size of the database connections to be maintained. Defaults to 10.

10
max_overflow int

The maximum overflow size of the pool. Defaults to 10. This parameter is ignored for SQLite.

10
autoflush bool

If True, all changes to the database are flushed immediately. Defaults to True.

True
**kwargs Any

Additional keyword arguments to support the initialization of the SQLAlchemy adapter.

{}

Raises:

Type Description
ValueError

If the database adapter is not initialized.

create(model)

Inserts data into the database using SQLAlchemy ORM.

This method provides a structured way to insert data using ORM models.

Parameters:

Name Type Description Default
model DeclarativeBase | list[DeclarativeBase]

An instance or list of instances of SQLAlchemy model to be inserted.

required
Example

To insert a row into a table:

data_store.create(MyModel(column1="value1", column2="value2"))

To insert multiple rows:

data_store.create([
    MyModel(column1="value1", column2="value2"),
    MyModel(column1="value3", column2="value4")
])

Raises:

Type Description
RuntimeError

If the insertion fails.

RuntimeError

If an unexpected error occurs.

delete(model_class, filters=None, allow_delete_all=False, **kwargs)

Deletes data from the database using SQLAlchemy ORM.

This method provides a structured way to delete data using ORM models.

Parameters:

Name Type Description Default
model_class Type[DeclarativeBase]

The SQLAlchemy model class to delete.

required
filters QueryFilter | None

Filters to apply to the query. Defaults to None.

None
allow_delete_all bool

If True, allows deletion of all records. Defaults to False.

False
**kwargs Any

Additional keyword arguments to support the delete method.

{}
Example

To delete a row from a table:

data_store.delete(
    MyModel,
    filters=QueryFilter(conditions={"id": 1})
)

Raises:

Type Description
ValueError

If no filters are provided (to prevent accidental deletion of all records).

RuntimeError

If the delete operation fails.

RuntimeError

If an unexpected error occurs.

query(query, params=None) async

Executes raw SQL queries.

Preferred for complex queries, when working with legacy schemas without ORM models, or when using an LLM to generate your SQL queries. Use this method when you need advanced SQL operations not supported by read().

Parameters:

Name Type Description Default
query str

The query string with optional :param style parameters.

required
params dict[str, Any] | None

Parameters to bind to the query. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: The result of the query.

Note

Using string parameters directly in queries is unsafe and vulnerable to SQL injection. Therefore, please avoid doing as follows as they're unsafe:

name = "O'Connor"
query = f"SELECT * FROM users WHERE last_name = '{name}'"

or

query = "SELECT * FROM users WHERE last_name = '" + name + "'"

Instead, please use parameterized queries with :param style notation as follows:

query = "SELECT * FROM users WHERE last_name = :last_name"
params = {"last_name": "O'Connor"}

Raises:

Type Description
RuntimeError

If the query fails.

RuntimeError

If an unexpected error occurs.

read(model_class, filters=None, options=None)

Reads data from the database using SQLAlchemy ORM with a structured, type-safe interface.

This method provides a high-level interface for querying data using ORM models. It supports filtering, column selection, ordering, and limiting results through a type-safe interface.

Parameters:

Name Type Description Default
model_class Type[DeclarativeBase]

The SQLAlchemy model class to query.

required
filters QueryFilter | None

Optional query filters containing column-value pairs to filter the results. Defaults to None.

None
options QueryOptions | None

Optional query configuration including: - columns: Specific columns to select - order_by: Column to sort by - order_desc: Sort order (ascending/descending) - limit: Maximum number of results Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing the query results.

Example
data_store.read(
    Message,
    filters=QueryFilter(conditions={"conversation_id": "123"}),
    options=QueryOptions(
        columns=["role", "content"],
        order_by="created_at",
        order_desc=True,
        limit=10
    )
)

Raises:

Type Description
RuntimeError

If the read operation fails.

RuntimeError

If an unexpected error occurs.

update(model_class, update_values, filters=None, **kwargs)

Updates data in the database using SQLAlchemy ORM.

This method provides a structured way to update data using ORM models.

Parameters:

Name Type Description Default
model_class Type[DeclarativeBase]

The SQLAlchemy model class to update.

required
update_values dict[str, Any]

Values to update.

required
filters QueryFilter | None

Filters to apply to the query. Defaults to None.

None
**kwargs Any

Additional keyword arguments to support the update method.

{}
Example

To update a row in a table:

data_store.update(
    MyModel,
    update_values={"column1": "new_value"}
    filters=QueryFilter(conditions={"id": 1}),
)

Raises:

Type Description
RuntimeError

If the update operation fails.

RuntimeError

If an unexpected error occurs.