Using Database Triggers in SQLAlchemy
Triggers are useful tools in relational databases that allow you to execute custom code when specific events occur on a table. For instance, triggers can automatically populate an audit log table whenever a new mutation is applied to a different table. This way we can ensure that all changes (including those made by other applications) are meticulously recorded, enabling the enforcement on the database-level and reducing the need for additional code in the applications.
This guide explains how to attach triggers to your SQLAlchemy models (objects) and configure the schema migration to manage both the triggers and the SQLAlchemy model as a single migration unit using Atlas.
Atlas support for triggers used in this guide is available exclusively to Pro users. To use this feature, run:
atlas login
Getting started with Atlas and SQLAlchemy
Before we continue, ensure you have installed the Atlas SQLAlchemy Provider on your SQLAlchemy project.
To set up, follow along the getting started guide for SQLAlchemy and Atlas.
Composite Schema
The SQLAlchemy package is mostly used for defining tables (our Python models) and interacting with the database. Table triggers or any other database native objects do not have representation in SQLAlchemy models. A trigger function can be defined once, and used in multiple triggers in different tables.
In order to extend our PostgreSQL schema to include both our SQLAlchemy models and their triggers, we configure Atlas to read the state of the schema from a Composite Schema data source. Composite Schemas allow the composition of multiple Atlas schemas into a unified schema graph, in this case our SQL trigger file and the python models file.
Follow the steps below to configure this for your project:
1. Let's define a simple model with two classes (tables): Users
and UserAuditLogs
:
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
class UserAuditLogs(Base):
__tablename__ = 'user_audit_logs'
id = Column(Integer, primary_key=True, autoincrement=True)
operation_type = Column(String, nullable=False)
operation_time = Column(String, nullable=False)
old_value = Column(String, nullable=True)
new_value = Column(String, nullable=True)
Now, suppose we want to log every change to the users
table and save it in the user_audit_logs
table.
To achieve this, we need to create a trigger function on INSERT
, UPDATE
and DELETE
operations and attach it to
the users
table.
2. For the next step, we define a trigger function ( audit_users_changes
) and attach it to the users
table using the CREATE TRIGGER
commands.
We do so in a separate SQL file, as SQLAlchemy does not support declaring triggers natively.
- PostgreSQL
- MySQL / MariaDB
- SQLite
- Microsoft SQL Server
-- Function to audit changes in the users table.
CREATE OR REPLACE FUNCTION audit_users_changes()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit
AFTER INSERT ON users
FOR EACH ROW
EXECUTE FUNCTION audit_users_changes();
-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit
AFTER UPDATE ON users
FOR EACH ROW
EXECUTE FUNCTION audit_users_changes();
-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit
AFTER DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION audit_users_changes();
-- Function to audit changes in the users table.
DELIMITER $$
CREATE PROCEDURE audit_users_changes(
operation_type VARCHAR(10),
old_value JSON,
new_value JSON
)
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (operation_type, CURRENT_TIMESTAMP, old_value, new_value);
END$$
DELIMITER ;
-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit
AFTER INSERT ON users
FOR EACH ROW
BEGIN
CALL audit_users_changes(
'INSERT',
NULL,
JSON_OBJECT('id', NEW.id, 'name', NEW.name)
);
END;
-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
CALL audit_users_changes(
'UPDATE',
JSON_OBJECT('id', OLD.id, 'name', OLD.name),
JSON_OBJECT('id', NEW.id, 'name', NEW.name)
);
END;
-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit
AFTER DELETE ON users
FOR EACH ROW
BEGIN
CALL audit_users_changes(
'DELETE',
JSON_OBJECT('id', OLD.id, 'name', OLD.name),
NULL
);
END;
-- Function to audit changes in the users table.
-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit
AFTER INSERT ON users
FOR EACH ROW
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES ('INSERT', datetime('now'), json_object('id', NEW.id, 'name', NEW.name));
END;
-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES ('UPDATE', datetime('now'), json_object('id', OLD.id, 'name', OLD.name), json_object('id', NEW.id, 'name', NEW.name));
END;
-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit
AFTER DELETE ON users
FOR EACH ROW
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES ('DELETE', datetime('now'), json_object('id', OLD.id, 'name', OLD.name));
END;
-- Function to audit changes in the users table.
-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit
ON users
AFTER INSERT
AS
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
SELECT 'INSERT', GETDATE(), (SELECT * FROM (SELECT id, name) AS new_data FOR JSON AUTO)
FROM inserted;
END;
-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit
ON users
AFTER UPDATE
AS
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
SELECT 'UPDATE', GETDATE(),
(SELECT * FROM (SELECT id, name) AS old_data FOR JSON AUTO),
(SELECT * FROM (SELECT id, name) AS new_data FOR JSON AUTO)
FROM deleted, inserted;
END;
-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit
ON users
AFTER DELETE
AS
BEGIN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
SELECT 'DELETE', GETDATE(), (SELECT * FROM (SELECT id, name) AS old_data FOR JSON AUTO)
FROM deleted;
END;
3. In your atlas.hcl
config file, add a composite_schema
that includes both our SQLAlchemy models and
the custom triggers defined in schema.sql
:
- PostgreSQL
- MySQL
- MariaDB
- SQLite
- Microsoft SQL Server
data "composite_schema" "app" {
# Load the SQLAlchemy model first
schema "public" {
url = data.external_schema.sqlalchemy.url
}
# Next, load the trigger.
schema "public" {
url = "file://triggers/schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "docker://postgres/15/dev?search_path=public"
}
data "composite_schema" "app" {
# Load the SQLAlchemy model first
schema "public" {
url = data.external_schema.sqlalchemy.url
}
# Next, load the trigger.
schema "public" {
url = "file://triggers/schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "docker://mysql/8/dev"
}
data "composite_schema" "app" {
# Load the SQLAlchemy model first
schema "public" {
url = data.external_schema.sqlalchemy.url
}
# Next, load the trigger.
schema "public" {
url = "file://triggers/schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "docker://mariadb/latest/dev"
}
data "composite_schema" "app" {
# Load the SQLAlchemy model first
schema "public" {
url = data.external_schema.sqlalchemy.url
}
# Next, load the trigger.
schema "public" {
url = "file://triggers/schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "sqlite://dev?mode=memory"
}
data "composite_schema" "app" {
# Load the SQLAlchemy model first
schema "public" {
url = data.external_schema.sqlalchemy.url
}
# Next, load the trigger.
schema "public" {
url = "file://triggers/schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "docker://sqlserver/2022-latest"
}
Usage
After setting up our composite schema, we can get its representation using the atlas schema inspect
command, generate
schema migrations for it, apply them to a database, and more. Below are a few examples:
Inspect the Schema
The atlas schema inspect
command is commonly used to inspect databases. However, we can also use it to inspect our
composite_schema
and print the SQL representation of it:
atlas schema inspect \
--env local \
--url env://src \
--format '{{ sql . }}'
The command above prints the following SQL. Note, the audit_users_changes
function and the triggers are defined after
the users
and user_audit_logs
tables:
-- Create "user_audit_logs" table
CREATE TABLE "user_audit_logs" ("id" serial NOT NULL, "operation_type" character varying NOT NULL, "operation_time" character varying NOT NULL, "old_value" character varying NULL, "new_value" character varying NULL, PRIMARY KEY ("id"));
-- Create "users" table
CREATE TABLE "users" ("id" serial NOT NULL, "name" character varying NOT NULL, PRIMARY KEY ("id"));
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
Generate Migrations For the Schema
To generate a migration for the schema, run the following command:
atlas migrate diff --env local
Note that a new migration file is created with the following contents:
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
Apply the Migrations
To apply the migration generated above to a database, run the following command:
- PostgreSQL
- MySQL
- MariaDB
- SQLite
- Microsoft SQL Server
atlas migrate apply \
--env local \
--url "postgres://localhost/db?sslmode=disable" // replace with your PostgreSQL database URL
atlas migrate apply \
--env local \
--url "mysql://localhost:3306/db" // replace with your MySQL database URL
atlas migrate apply \
--env local \
--url "maria://localhost/db" // replace with your MariaDB database URL
atlas migrate apply \
--env local \
--url "sqlite://db" // replace with your SQLite database URL
atlas migrate apply \
--env local \
--url "sqlserver://localhost:1433?database=master" // replace with your SQL Server database URL