背景
事情源于memos,一个开源的记事软件。
memos默认使用的是sqlite数据库,部署方便快捷。本着试一试的心态,就这样部署到自己的远程服务器上了。
结果用起来就一发不可收拾了,日常生活中的大事小事,心得体会都顺手就记了上去。
过了一段时间,总觉得如果正式使用,使用sqlite数据库是不太靠谱的,就有心从sqlite数据库切换到postgres数据库。
用AI推荐的方法,当然是用现成的工具pgloader之类的。但不知道为什么,pgloader加载后会提示类型不匹配的告警: invalid input syntax for type bigint: "strftime('%s', 'now')"
,怎么也处理不了。折腾了很久,就觉得还不如用python直接读取出来再写入到postgres数据库来得快些,于是请AI生成了以下迁移代码,一次性搞定。
SQLite迁移到postgres
import sqlite3
import psycopg2
from psycopg2 import sql
# ==== 配置部分 ====
SQLITE_DB_PATH = "memos_prod.db" # SQLite 数据库文件路径,替换成要转换的SQLite数据库
PG_CONFIG = {
"dbname": "你的数据库名",
"user": "用户名",
"password": "密码",
"host": "主机名或地址",
"port": 5432,
}
def map_sqlite_type(sqlite_type):
"""SQLite 类型映射到 PostgreSQL 类型"""
t = sqlite_type.upper()
if "INT" in t:
return "BIGINT"
elif "CHAR" in t or "CLOB" in t or "TEXT" in t:
return "TEXT"
elif "REAL" in t or "FLOA" in t or "DOUB" in t:
return "DOUBLE PRECISION"
elif "BLOB" in t:
return "BYTEA"
elif "NUMERIC" in t or "DECIMAL" in t:
return "NUMERIC"
else:
return "TEXT" # 默认
def migrate(sqlite_db, pg_conn):
sqlite_cur = sqlite_db.cursor()
pg_cur = pg_conn.cursor()
# 获取所有表
sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in sqlite_cur.fetchall() if row[0] != "sqlite_sequence"]
for table in tables:
print(f"正在迁移表: {table}")
# === 获取表结构 ===
sqlite_cur.execute(f"PRAGMA table_info({table});")
columns_info = sqlite_cur.fetchall()
# PRAGMA table_info 返回: cid, name, type, notnull, dflt_value, pk
column_defs = []
pk_columns = []
for cid, name, ctype, notnull, dflt_value, pk in columns_info:
pg_type = map_sqlite_type(ctype)
col_def = f'"{name}" {pg_type}'
if notnull:
col_def += " NOT NULL"
if dflt_value is not None:
col_def += f" DEFAULT {dflt_value}"
column_defs.append(col_def)
if pk:
pk_columns.append(f'"{name}"')
if pk_columns:
column_defs.append(f"PRIMARY KEY ({', '.join(pk_columns)})")
create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)});'
pg_cur.execute(sql.SQL(create_table_sql))
# === 插入数据 ===
sqlite_cur.execute(f"SELECT * FROM {table};")
rows = sqlite_cur.fetchall()
column_names = [col[1] for col in columns_info]
if rows:
placeholders = ", ".join(["%s"] * len(column_names))
insert_sql = sql.SQL(f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})')
pg_cur.executemany(insert_sql, rows)
pg_conn.commit()
# === 迁移索引 ===
sqlite_cur.execute(f"PRAGMA index_list({table});")
indexes = sqlite_cur.fetchall()
# 返回: seq, name, unique, origin, partial
for _, idx_name, unique, origin, partial in indexes:
sqlite_cur.execute(f"PRAGMA index_info({idx_name});")
idx_cols = [row[2] for row in sqlite_cur.fetchall()]
if not idx_cols:
continue
unique_sql = "UNIQUE" if unique else ""
index_sql = f'CREATE {unique_sql} INDEX IF NOT EXISTS "{idx_name}" ON "{table}" ({", ".join(idx_cols)});'
pg_cur.execute(sql.SQL(index_sql))
# === 迁移外键 ===
sqlite_cur.execute(f"PRAGMA foreign_key_list({table});")
fkeys = sqlite_cur.fetchall()
# 返回: id, seq, table, from, to, on_update, on_delete, match
for _, _, ref_table, from_col, to_col, on_update, on_delete, match in fkeys:
fk_name = f"fk_{table}_{from_col}_{ref_table}_{to_col}"
fk_sql = (
f'ALTER TABLE "{table}" ADD CONSTRAINT "{fk_name}" '
f'FOREIGN KEY ("{from_col}") REFERENCES "{ref_table}" ("{to_col}") '
)
if on_update != "NO ACTION":
fk_sql += f"ON UPDATE {on_update} "
if on_delete != "NO ACTION":
fk_sql += f"ON DELETE {on_delete} "
pg_cur.execute(sql.SQL(fk_sql))
pg_conn.commit()
print("✅ 数据、主键、索引、外键迁移完成!")
if __name__ == "__main__":
# 连接 SQLite
sqlite_db = sqlite3.connect(SQLITE_DB_PATH)
sqlite_db.execute("PRAGMA foreign_keys=ON;")
# 连接 PostgreSQL
pg_conn = psycopg2.connect(**PG_CONFIG)
try:
migrate(sqlite_db, pg_conn)
finally:
sqlite_db.close()
pg_conn.close()
只需要将SQLite数据库和postgres数据库换成自己相应的数据库就可以了。当然,不要忘记安装postgre相关的依赖。
1.安装依赖
pip install psycopg2-binary
2.运行脚本
python sqlite_to_postgres.py
交互式迁移
当然,再通用一些,可以把自定义的内容直接通过交互界面输入。代码如下:
import sqlite3
import psycopg2
from psycopg2 import sql
# ==== 配置部分 ====
SQLITE_DB_PATH = "memos_prod.db" # SQLite 数据库文件路径
PG_CONFIG = {
"dbname": "memos",
"user": "hebtiger",
"password": "Ytz291!!",
"host": "localhost",
"port": 5432,
}
def map_sqlite_type(sqlite_type):
"""SQLite 类型映射到 PostgreSQL 类型"""
t = (sqlite_type or "").upper()
if "INT" in t:
return "BIGINT"
elif "CHAR" in t or "CLOB" in t or "TEXT" in t:
return "TEXT"
elif "REAL" in t or "FLOA" in t or "DOUB" in t:
return "DOUBLE PRECISION"
elif "BLOB" in t:
return "BYTEA"
elif "NUMERIC" in t or "DECIMAL" in t:
return "NUMERIC"
else:
return "TEXT" # 默认
def convert_default_value(dflt_value, pg_type):
"""转换 SQLite 默认值为 PostgreSQL 兼容的"""
if dflt_value is None:
return None
val = str(dflt_value).strip("()") # 去掉可能的括号
# 处理 SQLite 时间函数
if "strftime" in val.lower():
# strftime('%s','now') => 当前时间戳(秒)
return "EXTRACT(EPOCH FROM NOW())::BIGINT"
if val.upper() in ("CURRENT_TIMESTAMP", "NOW"):
return "CURRENT_TIMESTAMP"
# 如果是字符串字面量
if val.startswith("'") and val.endswith("'"):
return val
# 数字直接返回
if val.replace(".", "", 1).isdigit():
return val
# 其他情况丢弃,避免报错
return None
def migrate(sqlite_db, pg_conn):
sqlite_cur = sqlite_db.cursor()
pg_cur = pg_conn.cursor()
# 获取所有表
sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in sqlite_cur.fetchall() if row[0] != "sqlite_sequence"]
for table in tables:
print(f"正在迁移表: {table}")
# === 获取表结构 ===
sqlite_cur.execute(f"PRAGMA table_info({table});")
columns_info = sqlite_cur.fetchall()
# PRAGMA table_info 返回: cid, name, type, notnull, dflt_value, pk
column_defs = []
pk_columns = []
column_names = []
for cid, name, ctype, notnull, dflt_value, pk in columns_info:
pg_type = map_sqlite_type(ctype)
col_def = f'"{name}" {pg_type}'
if notnull:
col_def += " NOT NULL"
converted_default = convert_default_value(dflt_value, pg_type)
if converted_default:
col_def += f" DEFAULT {converted_default}"
column_defs.append(col_def)
column_names.append(name)
if pk:
pk_columns.append(f'"{name}"')
if pk_columns:
column_defs.append(f"PRIMARY KEY ({', '.join(pk_columns)})")
create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)});'
pg_cur.execute(sql.SQL(create_table_sql))
# === 插入数据 ===
sqlite_cur.execute(f"SELECT * FROM {table};")
rows = sqlite_cur.fetchall()
if rows:
placeholders = ", ".join(["%s"] * len(column_names))
insert_sql = sql.SQL(f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})')
pg_cur.executemany(insert_sql, rows)
pg_conn.commit()
# === 迁移索引 ===
sqlite_cur.execute(f"PRAGMA index_list({table});")
indexes = sqlite_cur.fetchall()
# 返回: seq, name, unique, origin, partial
for _, idx_name, unique, origin, partial in indexes:
sqlite_cur.execute(f"PRAGMA index_info({idx_name});")
idx_cols = [row[2] for row in sqlite_cur.fetchall()]
if not idx_cols:
continue
unique_sql = "UNIQUE" if unique else ""
index_sql = f'CREATE {unique_sql} INDEX IF NOT EXISTS "{idx_name}" ON "{table}" ({", ".join(idx_cols)});'
pg_cur.execute(sql.SQL(index_sql))
# === 迁移外键 ===
sqlite_cur.execute(f"PRAGMA foreign_key_list({table});")
fkeys = sqlite_cur.fetchall()
# 返回: id, seq, table, from, to, on_update, on_delete, match
for _, _, ref_table, from_col, to_col, on_update, on_delete, match in fkeys:
fk_name = f"fk_{table}_{from_col}_{ref_table}_{to_col}"
fk_sql = (
f'ALTER TABLE "{table}" ADD CONSTRAINT "{fk_name}" '
f'FOREIGN KEY ("{from_col}") REFERENCES "{ref_table}" ("{to_col}") '
)
if on_update != "NO ACTION":
fk_sql += f"ON UPDATE {on_update} "
if on_delete != "NO ACTION":
fk_sql += f"ON DELETE {on_delete} "
try:
pg_cur.execute(sql.SQL(fk_sql))
except psycopg2.errors.DuplicateObject:
# 已存在的外键约束跳过
pg_conn.rollback()
pg_conn.commit()
print("✅ 数据、主键、索引、外键迁移完成!")
if __name__ == "__main__":
# 连接 SQLite
sqlite_db = sqlite3.connect(SQLITE_DB_PATH)
sqlite_db.execute("PRAGMA foreign_keys=ON;")
# 连接 PostgreSQL
pg_conn = psycopg2.connect(**PG_CONFIG)
try:
migrate(sqlite_db, pg_conn)
finally:
sqlite_db.close()
pg_conn.close()
这样,运行代码,就可以交互式的运行迁移脚本了。
1.安装依赖
pip install psycopg2-binary
2.运行脚本
python sqlite_to_postgres.py
=== SQLite -> PostgreSQL 数据迁移工具 ===
请输入 SQLite 数据库文件路径 (例如: ./memos_prod.db):
请输入 PostgreSQL 数据库名称:
请输入 PostgreSQL 用户名:
请输入 PostgreSQL 密码:
请输入 PostgreSQL 主机 (默认: localhost):
请输入 PostgreSQL 端口 (默认: 5432):
加参数迁移
#!/usr/bin/env python3
import argparse
import sqlite3
import psycopg2
from psycopg2 import sql
import getpass
import json
# ================= 工具函数 =================
def convert_type(ctype: str):
"""将 SQLite 类型映射到 PostgreSQL 类型"""
ctype = ctype.upper()
if "INT" in ctype:
return "BIGINT"
elif "CHAR" in ctype or "CLOB" in ctype or "TEXT" in ctype:
return "TEXT"
elif "BLOB" in ctype:
return "BYTEA"
elif "REAL" in ctype or "FLOA" in ctype or "DOUB" in ctype:
return "DOUBLE PRECISION"
elif "BOOL" in ctype:
return "BOOLEAN"
elif "JSON" in ctype:
return "JSONB"
else:
return "TEXT"
def convert_default_value(dflt_value, pg_type):
"""将 SQLite 默认值转换为 PostgreSQL 兼容形式"""
if dflt_value is None:
return None
val = str(dflt_value).strip("()")
# 处理时间戳
if "strftime" in val.lower():
return "EXTRACT(EPOCH FROM NOW())::BIGINT"
if val.upper() in ("CURRENT_TIMESTAMP", "NOW"):
return "CURRENT_TIMESTAMP"
# 字符串字面量
if val.startswith("'") and val.endswith("'"):
return val
# 数字
if val.replace(".", "", 1).isdigit():
return val
return None # 不识别的默认值直接跳过
def adapt_row_for_postgres(row, column_names, col_types):
"""转换 SQLite 行数据,保证 PostgreSQL 类型兼容"""
new_row = []
for col, val in zip(column_names, row):
pg_type = col_types.get(col)
if val is None:
new_row.append(None)
elif pg_type == "boolean":
new_row.append(bool(val)) # 0/1 转 True/False
elif pg_type in ("json", "jsonb"):
if isinstance(val, str):
try:
json.loads(val) # 验证是否是 JSON
new_row.append(val)
except json.JSONDecodeError:
new_row.append(json.dumps(val))
else:
new_row.append(json.dumps(val))
elif pg_type == "bytea":
if isinstance(val, memoryview):
new_row.append(bytes(val))
else:
new_row.append(val)
else:
new_row.append(val)
return tuple(new_row)
# ================= 核心迁移函数 =================
def migrate(sqlite_db, pg_conn, schema_only=False, data_only=False,
include_tables=None, exclude_tables=None, if_exists="skip"):
sqlite_conn = sqlite3.connect(sqlite_db)
sqlite_cur = sqlite_conn.cursor()
pg_cur = pg_conn.cursor()
# 获取所有表
sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [r[0] for r in sqlite_cur.fetchall()]
if include_tables:
tables = [t for t in tables if t in include_tables]
if exclude_tables:
tables = [t for t in tables if t not in exclude_tables]
for table in tables:
print(f"正在迁移表: {table}")
# 获取列信息
sqlite_cur.execute(f"PRAGMA table_info({table});")
columns_info = sqlite_cur.fetchall() # (cid, name, type, notnull, dflt_value, pk)
column_defs = []
pk_columns = []
for cid, name, ctype, notnull, dflt_value, pk in columns_info:
pg_type = convert_type(ctype)
# 判断是否为自增主键
if pk and ctype.upper() in ("INTEGER", "INT"):
col_def = f'"{name}" BIGSERIAL PRIMARY KEY'
else:
col_def = f'"{name}" {pg_type}'
if notnull:
col_def += " NOT NULL"
default_val = convert_default_value(dflt_value, pg_type)
if default_val:
col_def += f" DEFAULT {default_val}"
if pk:
pk_columns.append(f'"{name}"')
column_defs.append(col_def)
# 创建表
if not data_only:
create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)}'
if pk_columns:
create_table_sql += f", PRIMARY KEY ({', '.join(pk_columns)})"
create_table_sql += ");"
pg_cur.execute(sql.SQL(create_table_sql))
pg_conn.commit()
# 插入数据
if not schema_only:
sqlite_cur.execute(f"SELECT * FROM {table};")
rows = sqlite_cur.fetchall()
if rows:
column_names = [c[1] for c in columns_info]
placeholders = ", ".join(["%s"] * len(column_names))
# 获取 PG 列类型
pg_cur.execute(
"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s;",
(table,)
)
col_types = {row[0]: row[1] for row in pg_cur.fetchall()}
converted_rows = [
adapt_row_for_postgres(row, column_names, col_types) for row in rows
]
if if_exists == "skip":
insert_sql = sql.SQL(
f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders}) '
f"ON CONFLICT DO NOTHING"
)
elif if_exists == "update":
conflict_target = pk_columns[0] if pk_columns else column_names[0]
update_assignments = ", ".join(
[f"{col}=EXCLUDED.{col}" for col in column_names if col not in pk_columns]
)
insert_sql = sql.SQL(
f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders}) '
f"ON CONFLICT ({conflict_target}) DO UPDATE SET {update_assignments}"
)
else: # replace
pg_cur.execute(f'TRUNCATE TABLE "{table}" RESTART IDENTITY CASCADE;')
insert_sql = sql.SQL(
f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})'
)
pg_cur.executemany(insert_sql, converted_rows)
pg_conn.commit()
# 更新自增序列(支持任意主键名,不再假设是 id)
for cid, name, ctype, notnull, dflt_value, pk in columns_info:
if pk and ctype.upper() in ("INTEGER", "INT"):
seq_name = f"{table}_{name}_seq"
try:
pg_cur.execute(
sql.SQL(
"SELECT setval(%s, COALESCE(MAX({id}), 1)) FROM {tbl}"
).format(
id=sql.Identifier(name),
tbl=sql.Identifier(table),
),
(seq_name,),
)
pg_conn.commit()
except Exception as e:
print(f"⚠️ 更新序列失败: 表 {table}, 列 {name}, 错误: {e}")
sqlite_conn.close()
# ================= 主程序入口 =================
def main():
parser = argparse.ArgumentParser(description="SQLite → PostgreSQL 数据迁移工具")
parser.add_argument("--schema-only", action="store_true", help="仅迁移表结构")
parser.add_argument("--data-only", action="store_true", help="仅迁移表数据")
parser.add_argument("--tables", nargs="+", help="仅迁移指定表")
parser.add_argument("--exclude-tables", nargs="+", help="排除指定表")
parser.add_argument("--if-exists", choices=["skip", "update", "replace"], default="skip",
help="遇到冲突时的处理方式")
args = parser.parse_args()
print("=== SQLite -> PostgreSQL 数据迁移工具 ===")
sqlite_db = input("请输入 SQLite 数据库文件路径 (例如: ./memos_prod.db): ").strip()
pg_db = input("请输入 PostgreSQL 数据库名称: ").strip()
pg_user = input("请输入 PostgreSQL 用户名: ").strip()
pg_password = getpass.getpass("请输入 PostgreSQL 密码: ")
pg_host = input("请输入 PostgreSQL 主机 (默认: localhost): ").strip() or "localhost"
pg_port = input("请输入 PostgreSQL 端口 (默认: 5432): ").strip() or "5432"
pg_conn = psycopg2.connect(
dbname=pg_db, user=pg_user, password=pg_password, host=pg_host, port=pg_port
)
migrate(
sqlite_db,
pg_conn,
schema_only=args.schema_only,
data_only=args.data_only,
include_tables=args.tables,
exclude_tables=args.exclude_tables,
if_exists=args.if_exists,
)
pg_conn.close()
print("=== 数据迁移完成 ===")
if __name__ == "__main__":
main()
全库迁移(结构 + 数据)
python sqlite_to_postgres.py
只迁移结构
python sqlite_to_postgres.py --schema-only
只迁移数据
python sqlite_to_postgres.py --data-only
只迁移某几个表(例如 users
和 tasks
)
python sqlite_to_postgres.py --tables users,tasks
排除日志表
python sqlite_to_postgres.py --exclude-tables logs,audit_trail
只迁移 users, tasks
,但排除 tasks
python sqlite_to_postgres.py --tables users,tasks --exclude-tables tasks
只迁移数据,遇到冲突时更新
冲突处理模式(--if-exists skip|update|replace
),默认是skip
python sqlite_to_postgres.py --data-only --if-exists update
后记
以上几个脚本,只有第一个是亲测有效。交互式迁移与加参数迁移都未曾验证,有用到的同学遇到任何问题,欢迎留言交流。