Rust 2026 经验谈 - 数据库与 ORM 选型
2755 字
14 分钟
Rust 2026 经验谈 - 数据库与 ORM 选型
数据库是后端服务的核心。Rust 的数据库生态在 2024-2026 年间快速成熟——SQLx 的编译时检查、SeaORM 的异步 ORM、Diesel 的类型安全查询构建器各有拥趸。本文从选型对比、连接池、migration、嵌入式数据库、事务模式五个方面,给出实战经验。
SQLx vs SeaORM vs Diesel 选型对比
SQLx:编译时 SQL 检查
SQLx 的核心卖点是编译时验证 SQL——如果 SQL 语法错误或表/列不存在,编译直接失败。
[dependencies]sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "uuid", "migrate"] }use sqlx::postgres::PgPool;
async fn create_user(pool: &PgPool, name: &str, email: &str) -> Result<User, sqlx::Error> { // 编译时检查:如果 users 表不存在或列名错误,cargo build 直接报错 let user = sqlx::query_as!( User, r#"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email, created_at"#, name, email ) .fetch_one(pool) .await?; Ok(user)}
async fn list_users(pool: &PgPool) -> Result<Vec<User>, sqlx::Error> { let users = sqlx::query_as!( User, r#"SELECT id, name, email, created_at FROM users ORDER BY id LIMIT 100"# ) .fetch_all(pool) .await?; Ok(users)}编译时检查的工作方式:SQLx 在 build.rs 中连接数据库(或读取离线查询数据),执行 EXPLAIN 验证 SQL。需要设置 DATABASE_URL 环境变量。
SQLx 的 query! vs query_as! vs query_scalar!
// query!:返回匿名列,字段名与 SQL 列名一致let row = sqlx::query!( r#"SELECT id, name FROM users WHERE id = $1"#, user_id).fetch_one(pool).await?;// row.id: i32, row.name: String
// query_as!:映射到自定义结构体#[derive(sqlx::FromRow)]struct UserRow { id: i32, name: String,}let user = sqlx::query_as!( UserRow, r#"SELECT id, name FROM users WHERE id = $1"#, user_id).fetch_one(pool).await?;
// query_scalar!:返回单个值let count: i64 = sqlx::query_scalar!( r#"SELECT COUNT(*) FROM users"#).fetch_one(pool).await?;SeaORM:异步 ORM
SeaORM 是基于 SQLx 构建的异步 ORM,提供实体生成、关系映射、活跃记录模式:
[dependencies]sea-orm = { version = "1.1", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros"] }# 从数据库生成实体sea-orm-cli generate entity \ -u postgres://user:pass@localhost/db \ -o src/entities \ --with-serde bothuse sea_orm::{Database, EntityTrait, QueryFilter, Set, ActiveModelTrait};use entity::user;
let db = Database::connect("postgres://user:pass@localhost/db").await?;
// 查询let users = user::Entity::find().all(&db).await?;
// 条件查询let alice = user::Entity::find() .filter(user::Column::Name.eq("Alice")) .one(&db) .await?;
// 插入let new_user = user::ActiveModel { name: Set("Bob".into()), email: Set("bob@example.com".into()), ..Default::default()};let result = new_user.insert(&db).await?;
// 更新let mut model: user::ActiveModel = alice.unwrap().into();model.name = Set("Alice Updated".into());model.update(&db).await?;
// 删除user::Entity::delete_by_id(1).exec(&db).await?;Diesel:类型安全查询构建器
Diesel 是 Rust 最早的 ORM,以强类型查询构建器闻名:
[dependencies]diesel = { version = "2.2", features = ["postgres", "chrono", "uuid"] }use diesel::prelude::*;use diesel::result::QueryResult;
// schema.rs 由 diesel print-schema 生成table! { users (id) { id -> Int4, name -> Varchar, email -> Varchar, created_at -> Timestamp, }}
#[derive(Queryable, Selectable)]#[diesel(table_name = users)]struct User { id: i32, name: String, email: String, created_at: chrono::NaiveDateTime,}
#[derive(Insertable)]#[diesel(table_name = users)]struct NewUser<'a> { name: &'a str, email: &'a str,}
fn create_user(conn: &mut PgConnection, name: &str, email: &str) -> QueryResult<User> { let new_user = NewUser { name, email }; diesel::insert_into(users::table) .values(&new_user) .returning(User::as_returning()) .get_result(conn)}
fn list_users(conn: &mut PgConnection) -> QueryResult<Vec<User>> { users::table .order(users::id.asc()) .limit(100) .load(conn)}三者对比
| 维度 | SQLx | SeaORM | Diesel |
|---|---|---|---|
| 异步 | 原生 async | 原生 async | 同步(diesel-async 可选) |
| 编译时检查 | SQL 语法+schema | 无(运行时) | schema + 类型 |
| 查询方式 | 原始 SQL | 链式 API | 链式 API |
| 学习曲线 | 低(SQL 优先) | 中 | 中高 |
| 代码生成 | 无 | sea-orm-cli | diesel-cli |
| 关系映射 | 手动 JOIN | 自动 | 手动/半自动 |
| 迁移支持 | 内置 | 内置 | diesel-cli |
| 数据库支持 | Pg/MySQL/SQLite | Pg/MySQL/SQLite | Pg/MySQL/SQLite |
| 性能 | 最高(薄封装) | 高 | 高 |
| 生态成熟度 | 高 | 中高 | 最高(历史最久) |
选型建议
选 SQLx 的场景:
- 团队 SQL 能力强,偏好手写 SQL
- 需要编译时 SQL 验证
- 追求最高性能
- 简单 CRUD,不需要复杂关系映射
选 SeaORM 的场景:
- 需要异步 ORM
- 表关系复杂,需要自动 JOIN
- 偏好链式 API
- 已有 SQLx 经验
选 Diesel 的场景:
- 同步场景(CLI、批处理)
- 需要最强的类型安全保证
- 已有 Diesel 代码库
- 复杂查询构建
踩坑:SQLx 编译时检查的 CI 集成
# SQLx 编译时检查需要数据库——CI 中如何处理?
# 方案 1:离线模式(推荐)# 先在本地生成离线查询数据# DATABASE_URL=postgres://... cargo sqlx prepare --check
# 提交 .sqlx 目录到 git# CI 中不需要数据库
# .github/workflows/ci.yml- name: Check with offline queries run: cargo check env: SQLX_OFFLINE: true
# 方案 2:CI 中启动数据库- name: Start Postgres run: | docker run -d -p 5432:5432 \ -e POSTGRES_PASSWORD=password \ postgres:16- name: Wait for DB run: sleep 5- name: Check run: cargo check env: DATABASE_URL: postgres://postgres:password@localhost/postgres连接池管理
sqlx::Pool 内置连接池
use sqlx::postgres::{PgPool, PgPoolOptions};use std::time::Duration;
let pool = PgPoolOptions::new() .max_connections(20) .min_connections(5) .acquire_timeout(Duration::from_secs(5)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .connect("postgres://user:pass@localhost/db") .await?;
// 使用let row = sqlx::query!("SELECT 1") .fetch_one(&pool) .await?;连接池配置建议:
max_connections = CPU 核数 * 2 + 有效磁盘数// 例如 8 核 + 1 SSD = 17
min_connections = 2~5(减少冷启动延迟)
acquire_timeout = 5~10s(超过说明池耗尽)
idle_timeout = 600s(10 分钟无活动关闭)
max_lifetime = 1800s(30 分钟强制轮换,避免数据库侧超时)deadpool:通用连接池
deadpool 是不绑定数据库的通用连接池,适用于非 SQLx 驱动:
[dependencies]deadpool = "0.12"deadpool-postgres = "0.14"tokio-postgres = "0.7"use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};use tokio_postgres::NoTls;
let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Verified, ..Default::default()};
let mgr = Manager::from_config( "postgres://user:pass@localhost/db".parse()?, NoTls, mgr_config,);
let pool = Pool::builder(mgr) .max_size(20) .build()?;
let client = pool.get().await?;let rows = client.query("SELECT 1", &[]).await?;连接池监控
// SQLx pool 状态let pool_stats = pool.size();println!( "Pool: total={}, idle={}, busy={}", pool_stats, pool.num_idle(), pool_stats - pool.num_idle(),);踩坑:连接泄漏
// ❌ 获取连接后 panic 或忘记归还async fn bad(pool: &PgPool) { let conn = pool.acquire().await.unwrap(); // 如果这里 panic,连接可能不归还 panic!("oops");}
// ✓ 使用作用域守卫(SQLx 自动管理)async fn good(pool: &PgPool) { // pool.acquire() 返回 PoolConnection // Drop 时自动归还连接 let mut conn = pool.acquire().await?; sqlx::query!("SELECT 1").execute(&mut *conn).await?; // conn Drop → 归还连接}Migration 策略
sqlx-cli
# 安装cargo install sqlx-cli --no-default-features --features postgres
# 创建 migrationsqlx migrate add create_users_table
# 生成的文件:migrations/20240101000000_create_users_table.sql-- migrations/20240101000000_create_users_table.sqlCREATE TABLE users ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, email VARCHAR(255) UNIQUE NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW());# 执行 migrationsqlx migrate run
# 回滚(需要手动编写 down migration)sqlx migrate revert
# 检查状态sqlx migrate info在应用中嵌入 migration
use sqlx::migrate::Migrator;
// 编译时嵌入 migration 文件static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::migrate::MigrateError> { MIGRATOR.run(pool).await}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = PgPool::connect(&env::var("DATABASE_URL")?).await?; run_migrations(&pool).await?; // 启动服务... Ok(())}sea-orm-cli migration
# 安装cargo install sea-orm-cli
# 创建 migrationsea-orm-cli migrate init
# 生成文件:migration/src/m20240101_000001_create_users_table.rsuse sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]pub struct Migration;
#[async_trait::async_trait]impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .create_table( Table::create() .table(User::Table) .if_not_exists() .col(ColumnDef::new(User::Id).integer().not_null().auto_increment().primary_key()) .col(ColumnDef::new(User::Name).string().not_null()) .col(ColumnDef::new(User::Email).string().not_null().unique_key()) .col(ColumnDef::new(User::CreatedAt).timestamp().not_null().default(Expr::current_timestamp())) .to_owned(), ) .await }
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .drop_table(Table::drop().table(User::Table).to_owned()) .await }}
#[derive(Iden)]enum User { Table, Id, Name, Email, CreatedAt,}diesel-cli migration
# 安装cargo install diesel_cli --no-default-features --features postgres
# 设置数据库 URLecho "DATABASE_URL=postgres://user:pass@localhost/db" > .env
# 创建 migrationdiesel migration generate create_users
# 生成文件:# migrations/20240101000000_create_users/up.sql# migrations/20240101000000_create_users/down.sql
# 执行diesel migration run
# 回滚diesel migration revert
# 重置(revert all + run all)diesel database reset踩坑:migration 的生产策略
开发环境:应用启动时自动 migration(sqlx::migrate!)测试环境:每个测试前 reset + migrate生产环境: 1. 不要在应用启动时自动 migration! 2. 用 CI/CD 管道执行 migration 3. migration 必须向后兼容(不改列名、不删列) 4. 大表改结构用零停机迁移(加新列 → 迁移数据 → 删旧列)嵌入式数据库
SQLite via rusqlite
[dependencies]rusqlite = { version = "0.32", features = ["bundled"] }use rusqlite::{Connection, params, Result};
fn main() -> Result<()> { let conn = Connection::open("data.db")?;
conn.execute_batch( "CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL )" )?;
conn.execute( "INSERT INTO users (name, email) VALUES (?1, ?2)", params!["Alice", "alice@example.com"], )?;
let mut stmt = conn.prepare("SELECT id, name, email FROM users")?; let users = stmt.query_map([], |row| { Ok(User { id: row.get(0)?, name: row.get(1)?, email: row.get(2)?, }) })?;
for user in users { println!("{:?}", user?); }
Ok(())}SQLite 异步:sqlx + sqlite
[dependencies]sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] }use sqlx::sqlite::SqlitePoolOptions;
let pool = SqlitePoolOptions::new() .max_connections(5) .connect("sqlite:data.db?mode=rwc") .await?;
sqlx::query!( "CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)").execute(&pool).await?;踩坑:SQLite 的 “并发” 限制——写操作加数据库级锁。多写场景下连接池大小设 1 即可(或用 WAL 模式):
// 启用 WAL 模式(允许读写并发)sqlx::query("PRAGMA journal_mode=WAL") .execute(&pool) .await?;Redb:纯 Rust KV 数据库
[dependencies]redb = "2"use redb::{Database, TableDefinition};
const TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("my_data");
let db = Database::create("data.redb")?;
let write_txn = db.begin_write()?;{ let mut table = write_txn.open_table(TABLE)?; table.insert("key1", b"value1")?; table.insert("key2", b"value2")?;}write_txn.commit()?;
let read_txn = db.begin_read()?;let table = read_txn.open_table(TABLE)?;let value = table.get("key1")?.unwrap();assert_eq!(value.value(), b"value1");redb 特点:
- 纯 Rust 实现,无 C 依赖
- ACID 事务
- 零配置,单文件数据库
- 适合嵌入式场景(桌面应用、边缘设备)
- 性能不如 SQLite(但差距不大)
嵌入式数据库选型
| 场景 | 推荐 | 原因 |
|---|---|---|
| 桌面应用本地存储 | SQLite (rusqlite) | 生态成熟、工具丰富 |
| 移动端应用 | SQLite (rusqlite) | iOS/Android 原生支持 |
| 边缘/IoT 设备 | redb | 纯 Rust、无 C 依赖、交叉编译简单 |
| 测试替身 | SQLite (内存模式) | 无需启动数据库服务 |
| 临时缓存 | redb / SQLite | 轻依赖 |
事务与并发模式
基本事务
// SQLxasync fn transfer_funds( pool: &PgPool, from: i32, to: i32, amount: i64,) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?;
let balance: i64 = sqlx::query_scalar!( "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE", from ) .fetch_one(&mut *tx) .await?;
if balance < amount { tx.rollback().await?; return Err(sqlx::Error::RowNotFound); }
sqlx::query!( "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from ) .execute(&mut *tx) .await?;
sqlx::query!( "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to ) .execute(&mut *tx) .await?;
tx.commit().await?; Ok(())}SeaORM 事务
use sea_orm::{DatabaseTransaction, TransactionTrait};
async fn transfer_funds( db: &DatabaseConnection, from: i32, to: i32, amount: i64,) -> Result<(), DbErr> { let txn = db.begin().await?;
let from_account = account::Entity::find_by_id(from) .lock(LockType::Update) .one(&txn) .await? .ok_or(DbErr::Custom("Account not found".into()))?;
if from_account.balance < amount { txn.rollback().await?; return Err(DbErr::Custom("Insufficient funds".into())); }
let mut from_active: account::ActiveModel = from_account.into(); from_active.balance = Set(from_active.balance.unwrap() - amount); from_active.update(&txn).await?;
let to_account = account::Entity::find_by_id(to) .lock(LockType::Update) .one(&txn) .await? .ok_or(DbErr::Custom("Account not found".into()))?;
let mut to_active: account::ActiveModel = to_account.into(); to_active.balance = Set(to_active.balance.unwrap() + amount); to_active.update(&txn).await?;
txn.commit().await?; Ok(())}事务嵌套(SAVEPOINT)
async fn nested_transaction(pool: &PgPool) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?;
sqlx::query!("INSERT INTO logs (msg) VALUES ('outer')") .execute(&mut *tx) .await?;
// 嵌套事务(SAVEPOINT) let mut savepoint = tx.begin().await?;
sqlx::query!("INSERT INTO logs (msg) VALUES ('inner')") .execute(&mut *savepoint) .await?;
// 可以回滚内层而不影响外层 if something_went_wrong { savepoint.rollback().await?; } else { savepoint.commit().await?; }
tx.commit().await?; Ok(())}并发模式:乐观锁
// 乐观锁:用 version 字段检测并发冲突async fn update_user_optimistic( pool: &PgPool, id: i32, name: String, expected_version: i32,) -> Result<bool, sqlx::Error> { let result = sqlx::query!( r#"UPDATE users SET name = $1, version = version + 1 WHERE id = $2 AND version = $3"#, name, id, expected_version ) .execute(pool) .await?;
Ok(result.rows_affected() > 0)}
// 使用match update_user_optimistic(pool, 1, "New Name".into(), 3).await? { true => println!("Updated"), false => println!("Conflict! Someone else modified the record"),}并发模式:SELECT FOR UPDATE 悲观锁
async fn update_user_pessimistic( pool: &PgPool, id: i32, name: String,) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?;
// 加行锁 let user = sqlx::query_as!( User, "SELECT * FROM users WHERE id = $1 FOR UPDATE", id ) .fetch_one(&mut *tx) .await?;
// 此时其他事务无法读取此行(等待锁释放) sqlx::query!( "UPDATE users SET name = $1 WHERE id = $2", name, id ) .execute(&mut *tx) .await?;
tx.commit().await?; Ok(())}踩坑:事务中的连接
// ❌ 事务中混用 pool 和 txasync fn bad(pool: &PgPool) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?;
// 这个查询在 pool 上执行,不在事务中! sqlx::query!("INSERT INTO logs (msg) VALUES ('outside tx')") .execute(pool) // ← 错误! .await?;
// 应该在事务中执行 sqlx::query!("INSERT INTO logs (msg) VALUES ('inside tx')") .execute(&mut *tx) // ← 正确 .await?;
tx.commit().await?; Ok(())}踩坑:长事务阻塞
// ❌ 在事务中做耗时操作async fn bad_long_txn(pool: &PgPool) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?;
// 锁定了行,然后做 30 秒的 HTTP 调用 let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = 1 FOR UPDATE") .fetch_one(&mut *tx) .await?;
let external = call_external_api(&user).await?; // 30 秒! // 期间其他事务全部阻塞
sqlx::query!("UPDATE users SET data = $1 WHERE id = 1", external) .execute(&mut *tx) .await?;
tx.commit().await?; Ok(())}
// ✓ 事务中只做数据库操作async fn good_short_txn(pool: &PgPool) -> Result<(), sqlx::Error> { // 先在事务外做耗时操作 let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = 1") .fetch_one(pool) .await?; let external = call_external_api(&user).await?;
// 事务中只做快速更新 let mut tx = pool.begin().await?; sqlx::query!("UPDATE users SET data = $1 WHERE id = 1", external) .execute(&mut *tx) .await?; tx.commit().await?;
Ok(())}支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
Rust 2026 经验谈 - 数据库与 ORM 选型
https://tinyzzh.github.io/posts/rust-2026/2026-07-05-rust_2026_035_database_orm/ 相关文章 智能推荐
1
Rust 2026 经验谈 - 序列化与数据格式
Rust serde 生态全览、零拷贝反序列化 rkyv、自定义 Serialize/Deserialize、serde 属性速查、Protocol Buffers 与 FlatBuffers 的 Rust 绑定。
2
Rust 2026 经验谈 - 异步生态选型指南
Rust Tokio vs async-std vs smol 选型对比、executor-less 库设计原则、glommio io_uring 实践、嵌入式异步 embassy、选型决策树。
3
Rust 2026 经验谈 - 声明宏 vs 过程宏选型
Rust 编译时间影响对比、调试难度对比、表达能力边界、何时该用 build script 替代宏、选型决策流程图。
4
Rust 2026 经验谈 - 通道与消息传递
Rust mpsc/mpmc channel 选型、channel 背压策略、select 模式、与 Go channel 的对比、MPSC 生产者-消费者实战。
5
Rust 2026 经验谈 - 异步流与迭代
Rust Stream trait 的实现与用法、async generator 的 nightly 状态、async fn 返回 Stream 的模式、背压控制、Stream 与 Iterator 的对比。
随机文章 随机推荐
TinyZ's Blog