很多开发者一提到持久化工作流,就想到Redis、Postgres、RabbitMQ这些重量级组件。但最近Obelisk项目提出了一个更简单的方案:SQLite就够了。对于大多数场景,你不需要额外的数据库服务,一个本地SQLite文件加上Litestream备份,就能构建可靠的持久化系统。
为什么是SQLite
持久化工作流的核心需求是:工作流状态必须持久保存,计算节点可以随时重建。传统方案通常需要一个数据库(Postgres/MySQL)存储状态、一个消息队列(Redis/RabbitMQ)处理任务、一个编排服务(Airflow/Temporal)协调流程。
但SQLite提供了事务性的持久化状态,而且不需要引入额外的数据库服务。没有网络跳转,没有额外的控制平面,没有新的运维表面。
适用场景
SQLite特别适合以下场景:
- AI Agent工作流:每个Agent有自己的小数据库,独立运行
- 实验性系统:频繁创建和销毁的工作流实例
- 单机部署:不需要分布式能力的小型系统
- 开发和测试:快速搭建原型,无需配置数据库
- 边缘计算:在资源受限的环境中运行
实现方案:SQLite + Litestream
这个方案的核心组件:
- SQLite:存储工作流状态和执行日志
- Litestream:将SQLite变更异步流式复制到S3兼容存储
- 廉价Worker:执行具体任务的计算节点
工作流程:工作流启动,SQLite记录初始状态 → 每个步骤完成后更新SQLite中的状态 → Litestream持续将变更备份到S3 → 如果Worker崩溃,从SQLite恢复状态继续执行 → 如果整个节点丢失,从S3备份恢复最新的SQLite文件。
代码示例
以下是使用Python实现的简单持久化工作流:
import sqlite3, json
from datetime import datetime
class DurableWorkflow:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.conn.execute(
"CREATE TABLE IF NOT EXISTS workflow_state ("
"id INTEGER PRIMARY KEY, workflow_id TEXT, step_name TEXT,"
"status TEXT, data JSON, created_at TIMESTAMP, updated_at TIMESTAMP)"
)
self.conn.execute(
"CREATE TABLE IF NOT EXISTS execution_log ("
"id INTEGER PRIMARY KEY, workflow_id TEXT, event_type TEXT,"
"payload JSON, created_at TIMESTAMP)"
)
self.conn.commit()
def start_workflow(self, workflow_id):
self.conn.execute(
"INSERT INTO workflow_state (workflow_id, status, created_at, updated_at) VALUES (?,?,?,?)",
(workflow_id, 'started', datetime.now(), datetime.now())
)
self.log_event(workflow_id, 'workflow_started', {})
self.conn.commit()
def complete_step(self, workflow_id, step_name, result_data):
self.conn.execute(
"UPDATE workflow_state SET step_name=?, status=?, data=?, updated_at=? WHERE workflow_id=?",
(step_name, 'step_completed', json.dumps(result_data), datetime.now(), workflow_id)
)
self.log_event(workflow_id, 'step_completed', {'step': step_name, 'result': result_data})
self.conn.commit()
def get_state(self, workflow_id):
cursor = self.conn.execute(
"SELECT step_name, status, data FROM workflow_state WHERE workflow_id=?",
(workflow_id,)
)
row = cursor.fetchone()
return {'step': row[0], 'status': row[1], 'data': json.loads(row[2])} if row else None
def replay_from_log(self, workflow_id):
cursor = self.conn.execute(
"SELECT event_type, payload FROM execution_log WHERE workflow_id=? ORDER BY id",
(workflow_id,)
)
for event_type, payload in cursor.fetchall():
print(f"Replaying: {event_type} - {json.loads(payload)}")
def log_event(self, workflow_id, event_type, payload):
self.conn.execute(
"INSERT INTO execution_log (workflow_id, event_type, payload, created_at) VALUES (?,?,?,?)",
(workflow_id, event_type, json.dumps(payload), datetime.now())
)
# 使用示例
wf = DurableWorkflow('/tmp/my_workflow.db')
wf.start_workflow('order-123')
wf.complete_step('order-123', 'validate', {'valid': True})
wf.complete_step('order-123', 'process', {'status': 'done'})
wf.replay_from_log('order-123') # 崩溃恢复
Litestream配置
安装和配置Litestream非常简单:
# 安装Litestream
curl -fsSL https://github.com/benbjohnson/litestream/releases/download/v0.3.13/litestream-v0.3.13-linux-amd64.tar.gz | tar xz
sudo mv litestream /usr/local/bin/
# 配置文件 /etc/litestream.yml
dbs:
- path: /var/lib/myapp/workflow.db
replicas:
- url: s3://mybucket/workflow-backups
注意事项
- 异步备份有延迟:Litestream是异步复制,如果SQLite文件在备份前丢失,最新的写入会丢失
- 不适合高可用场景:如果需要99.99%可用性,还是应该用Postgres
- 单机限制:SQLite是单写入者,不适合多节点并发写入
- 容量考虑:SQLite文件不宜过大(建议小于100GB),超过时应考虑分片
总结
对于AI Agent、实验性系统和小型部署来说,SQLite + Litestream是一个被低估的持久化方案。它让你用最少的基础设施获得可靠的持久化能力,同时保持系统的简单性。
本文参考来源:Obelisk – SQLite is all you need for durable workflows
© 版权声明
THE END














暂无评论内容