背景

事情源于memos,一个开源的记事软件。

memos默认使用的是sqlite数据库,部署方便快捷。本着试一试的心态,就这样部署到自己的远程服务器上了。

结果用起来就一发不可收拾了,日常生活中的大事小事,心得体会都顺手就记了上去。

过了一段时间,总觉得如果正式使用,使用sqlite数据库是不太靠谱的,就有心从sqlite数据库切换到postgres数据库。

用AI推荐的方法,当然是用现成的工具pgloader之类的。但不知道为什么,pgloader加载后会提示类型不匹配的告警: invalid input syntax for type bigint: "strftime('%s', 'now')" ,怎么也处理不了。折腾了很久,就觉得还不如用python直接读取出来再写入到postgres数据库来得快些,于是请AI生成了以下迁移代码,一次性搞定。

SQLite迁移到postgres

import sqlite3
import psycopg2
from psycopg2 import sql

# ==== 配置部分 ====
SQLITE_DB_PATH = "memos_prod.db"  # SQLite 数据库文件路径,替换成要转换的SQLite数据库
PG_CONFIG = {
    "dbname": "你的数据库名",
    "user": "用户名",
    "password": "密码",
    "host": "主机名或地址",
    "port": 5432,
}


def map_sqlite_type(sqlite_type):
    """SQLite 类型映射到 PostgreSQL 类型"""
    t = sqlite_type.upper()
    if "INT" in t:
        return "BIGINT"
    elif "CHAR" in t or "CLOB" in t or "TEXT" in t:
        return "TEXT"
    elif "REAL" in t or "FLOA" in t or "DOUB" in t:
        return "DOUBLE PRECISION"
    elif "BLOB" in t:
        return "BYTEA"
    elif "NUMERIC" in t or "DECIMAL" in t:
        return "NUMERIC"
    else:
        return "TEXT"  # 默认


def migrate(sqlite_db, pg_conn):
    sqlite_cur = sqlite_db.cursor()
    pg_cur = pg_conn.cursor()

    # 获取所有表
    sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [row[0] for row in sqlite_cur.fetchall() if row[0] != "sqlite_sequence"]

    for table in tables:
        print(f"正在迁移表: {table}")

        # === 获取表结构 ===
        sqlite_cur.execute(f"PRAGMA table_info({table});")
        columns_info = sqlite_cur.fetchall()
        # PRAGMA table_info 返回: cid, name, type, notnull, dflt_value, pk

        column_defs = []
        pk_columns = []
        for cid, name, ctype, notnull, dflt_value, pk in columns_info:
            pg_type = map_sqlite_type(ctype)
            col_def = f'"{name}" {pg_type}'
            if notnull:
                col_def += " NOT NULL"
            if dflt_value is not None:
                col_def += f" DEFAULT {dflt_value}"
            column_defs.append(col_def)
            if pk:
                pk_columns.append(f'"{name}"')

        if pk_columns:
            column_defs.append(f"PRIMARY KEY ({', '.join(pk_columns)})")

        create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)});'
        pg_cur.execute(sql.SQL(create_table_sql))

        # === 插入数据 ===
        sqlite_cur.execute(f"SELECT * FROM {table};")
        rows = sqlite_cur.fetchall()
        column_names = [col[1] for col in columns_info]
        if rows:
            placeholders = ", ".join(["%s"] * len(column_names))
            insert_sql = sql.SQL(f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})')
            pg_cur.executemany(insert_sql, rows)

        pg_conn.commit()

        # === 迁移索引 ===
        sqlite_cur.execute(f"PRAGMA index_list({table});")
        indexes = sqlite_cur.fetchall()
        # 返回: seq, name, unique, origin, partial
        for _, idx_name, unique, origin, partial in indexes:
            sqlite_cur.execute(f"PRAGMA index_info({idx_name});")
            idx_cols = [row[2] for row in sqlite_cur.fetchall()]
            if not idx_cols:
                continue
            unique_sql = "UNIQUE" if unique else ""
            index_sql = f'CREATE {unique_sql} INDEX IF NOT EXISTS "{idx_name}" ON "{table}" ({", ".join(idx_cols)});'
            pg_cur.execute(sql.SQL(index_sql))

        # === 迁移外键 ===
        sqlite_cur.execute(f"PRAGMA foreign_key_list({table});")
        fkeys = sqlite_cur.fetchall()
        # 返回: id, seq, table, from, to, on_update, on_delete, match
        for _, _, ref_table, from_col, to_col, on_update, on_delete, match in fkeys:
            fk_name = f"fk_{table}_{from_col}_{ref_table}_{to_col}"
            fk_sql = (
                f'ALTER TABLE "{table}" ADD CONSTRAINT "{fk_name}" '
                f'FOREIGN KEY ("{from_col}") REFERENCES "{ref_table}" ("{to_col}") '
            )
            if on_update != "NO ACTION":
                fk_sql += f"ON UPDATE {on_update} "
            if on_delete != "NO ACTION":
                fk_sql += f"ON DELETE {on_delete} "
            pg_cur.execute(sql.SQL(fk_sql))

        pg_conn.commit()

    print("✅ 数据、主键、索引、外键迁移完成!")


if __name__ == "__main__":
    # 连接 SQLite
    sqlite_db = sqlite3.connect(SQLITE_DB_PATH)
    sqlite_db.execute("PRAGMA foreign_keys=ON;")

    # 连接 PostgreSQL
    pg_conn = psycopg2.connect(**PG_CONFIG)

    try:
        migrate(sqlite_db, pg_conn)
    finally:
        sqlite_db.close()
        pg_conn.close()

只需要将SQLite数据库和postgres数据库换成自己相应的数据库就可以了。当然,不要忘记安装postgre相关的依赖。

1.安装依赖

pip install psycopg2-binary

2.运行脚本

python sqlite_to_postgres.py

交互式迁移

当然,再通用一些,可以把自定义的内容直接通过交互界面输入。代码如下:

import sqlite3
import psycopg2
from psycopg2 import sql

# ==== 配置部分 ====
SQLITE_DB_PATH = "memos_prod.db"  # SQLite 数据库文件路径
PG_CONFIG = {
    "dbname": "memos",
    "user": "hebtiger",
    "password": "Ytz291!!",
    "host": "localhost",
    "port": 5432,
}


def map_sqlite_type(sqlite_type):
    """SQLite 类型映射到 PostgreSQL 类型"""
    t = (sqlite_type or "").upper()
    if "INT" in t:
        return "BIGINT"
    elif "CHAR" in t or "CLOB" in t or "TEXT" in t:
        return "TEXT"
    elif "REAL" in t or "FLOA" in t or "DOUB" in t:
        return "DOUBLE PRECISION"
    elif "BLOB" in t:
        return "BYTEA"
    elif "NUMERIC" in t or "DECIMAL" in t:
        return "NUMERIC"
    else:
        return "TEXT"  # 默认


def convert_default_value(dflt_value, pg_type):
    """转换 SQLite 默认值为 PostgreSQL 兼容的"""
    if dflt_value is None:
        return None

    val = str(dflt_value).strip("()")  # 去掉可能的括号

    # 处理 SQLite 时间函数
    if "strftime" in val.lower():
        # strftime('%s','now') => 当前时间戳(秒)
        return "EXTRACT(EPOCH FROM NOW())::BIGINT"
    if val.upper() in ("CURRENT_TIMESTAMP", "NOW"):
        return "CURRENT_TIMESTAMP"

    # 如果是字符串字面量
    if val.startswith("'") and val.endswith("'"):
        return val

    # 数字直接返回
    if val.replace(".", "", 1).isdigit():
        return val

    # 其他情况丢弃,避免报错
    return None


def migrate(sqlite_db, pg_conn):
    sqlite_cur = sqlite_db.cursor()
    pg_cur = pg_conn.cursor()

    # 获取所有表
    sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [row[0] for row in sqlite_cur.fetchall() if row[0] != "sqlite_sequence"]

    for table in tables:
        print(f"正在迁移表: {table}")

        # === 获取表结构 ===
        sqlite_cur.execute(f"PRAGMA table_info({table});")
        columns_info = sqlite_cur.fetchall()
        # PRAGMA table_info 返回: cid, name, type, notnull, dflt_value, pk

        column_defs = []
        pk_columns = []
        column_names = []

        for cid, name, ctype, notnull, dflt_value, pk in columns_info:
            pg_type = map_sqlite_type(ctype)
            col_def = f'"{name}" {pg_type}'
            if notnull:
                col_def += " NOT NULL"
            converted_default = convert_default_value(dflt_value, pg_type)
            if converted_default:
                col_def += f" DEFAULT {converted_default}"
            column_defs.append(col_def)
            column_names.append(name)
            if pk:
                pk_columns.append(f'"{name}"')

        if pk_columns:
            column_defs.append(f"PRIMARY KEY ({', '.join(pk_columns)})")

        create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)});'
        pg_cur.execute(sql.SQL(create_table_sql))

        # === 插入数据 ===
        sqlite_cur.execute(f"SELECT * FROM {table};")
        rows = sqlite_cur.fetchall()
        if rows:
            placeholders = ", ".join(["%s"] * len(column_names))
            insert_sql = sql.SQL(f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})')
            pg_cur.executemany(insert_sql, rows)

        pg_conn.commit()

        # === 迁移索引 ===
        sqlite_cur.execute(f"PRAGMA index_list({table});")
        indexes = sqlite_cur.fetchall()
        # 返回: seq, name, unique, origin, partial
        for _, idx_name, unique, origin, partial in indexes:
            sqlite_cur.execute(f"PRAGMA index_info({idx_name});")
            idx_cols = [row[2] for row in sqlite_cur.fetchall()]
            if not idx_cols:
                continue
            unique_sql = "UNIQUE" if unique else ""
            index_sql = f'CREATE {unique_sql} INDEX IF NOT EXISTS "{idx_name}" ON "{table}" ({", ".join(idx_cols)});'
            pg_cur.execute(sql.SQL(index_sql))

        # === 迁移外键 ===
        sqlite_cur.execute(f"PRAGMA foreign_key_list({table});")
        fkeys = sqlite_cur.fetchall()
        # 返回: id, seq, table, from, to, on_update, on_delete, match
        for _, _, ref_table, from_col, to_col, on_update, on_delete, match in fkeys:
            fk_name = f"fk_{table}_{from_col}_{ref_table}_{to_col}"
            fk_sql = (
                f'ALTER TABLE "{table}" ADD CONSTRAINT "{fk_name}" '
                f'FOREIGN KEY ("{from_col}") REFERENCES "{ref_table}" ("{to_col}") '
            )
            if on_update != "NO ACTION":
                fk_sql += f"ON UPDATE {on_update} "
            if on_delete != "NO ACTION":
                fk_sql += f"ON DELETE {on_delete} "
            try:
                pg_cur.execute(sql.SQL(fk_sql))
            except psycopg2.errors.DuplicateObject:
                # 已存在的外键约束跳过
                pg_conn.rollback()

        pg_conn.commit()

    print("✅ 数据、主键、索引、外键迁移完成!")


if __name__ == "__main__":
    # 连接 SQLite
    sqlite_db = sqlite3.connect(SQLITE_DB_PATH)
    sqlite_db.execute("PRAGMA foreign_keys=ON;")

    # 连接 PostgreSQL
    pg_conn = psycopg2.connect(**PG_CONFIG)

    try:
        migrate(sqlite_db, pg_conn)
    finally:
        sqlite_db.close()
        pg_conn.close()

这样,运行代码,就可以交互式的运行迁移脚本了。

1.安装依赖

pip install psycopg2-binary

2.运行脚本

python sqlite_to_postgres.py

=== SQLite -> PostgreSQL 数据迁移工具 ===
请输入 SQLite 数据库文件路径 (例如: ./memos_prod.db):
请输入 PostgreSQL 数据库名称:
请输入 PostgreSQL 用户名:
请输入 PostgreSQL 密码:
请输入 PostgreSQL 主机 (默认: localhost):
请输入 PostgreSQL 端口 (默认: 5432):

加参数迁移

#!/usr/bin/env python3
import argparse
import sqlite3
import psycopg2
from psycopg2 import sql
import getpass
import json

# ================= 工具函数 =================

def convert_type(ctype: str):
    """将 SQLite 类型映射到 PostgreSQL 类型"""
    ctype = ctype.upper()
    if "INT" in ctype:
        return "BIGINT"
    elif "CHAR" in ctype or "CLOB" in ctype or "TEXT" in ctype:
        return "TEXT"
    elif "BLOB" in ctype:
        return "BYTEA"
    elif "REAL" in ctype or "FLOA" in ctype or "DOUB" in ctype:
        return "DOUBLE PRECISION"
    elif "BOOL" in ctype:
        return "BOOLEAN"
    elif "JSON" in ctype:
        return "JSONB"
    else:
        return "TEXT"

def convert_default_value(dflt_value, pg_type):
    """将 SQLite 默认值转换为 PostgreSQL 兼容形式"""
    if dflt_value is None:
        return None

    val = str(dflt_value).strip("()")

    # 处理时间戳
    if "strftime" in val.lower():
        return "EXTRACT(EPOCH FROM NOW())::BIGINT"
    if val.upper() in ("CURRENT_TIMESTAMP", "NOW"):
        return "CURRENT_TIMESTAMP"

    # 字符串字面量
    if val.startswith("'") and val.endswith("'"):
        return val

    # 数字
    if val.replace(".", "", 1).isdigit():
        return val

    return None  # 不识别的默认值直接跳过

def adapt_row_for_postgres(row, column_names, col_types):
    """转换 SQLite 行数据,保证 PostgreSQL 类型兼容"""
    new_row = []
    for col, val in zip(column_names, row):
        pg_type = col_types.get(col)
        if val is None:
            new_row.append(None)
        elif pg_type == "boolean":
            new_row.append(bool(val))  # 0/1 转 True/False
        elif pg_type in ("json", "jsonb"):
            if isinstance(val, str):
                try:
                    json.loads(val)  # 验证是否是 JSON
                    new_row.append(val)
                except json.JSONDecodeError:
                    new_row.append(json.dumps(val))
            else:
                new_row.append(json.dumps(val))
        elif pg_type == "bytea":
            if isinstance(val, memoryview):
                new_row.append(bytes(val))
            else:
                new_row.append(val)
        else:
            new_row.append(val)
    return tuple(new_row)

# ================= 核心迁移函数 =================

def migrate(sqlite_db, pg_conn, schema_only=False, data_only=False,
            include_tables=None, exclude_tables=None, if_exists="skip"):
    sqlite_conn = sqlite3.connect(sqlite_db)
    sqlite_cur = sqlite_conn.cursor()
    pg_cur = pg_conn.cursor()

    # 获取所有表
    sqlite_cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [r[0] for r in sqlite_cur.fetchall()]

    if include_tables:
        tables = [t for t in tables if t in include_tables]
    if exclude_tables:
        tables = [t for t in tables if t not in exclude_tables]

    for table in tables:
        print(f"正在迁移表: {table}")

        # 获取列信息
        sqlite_cur.execute(f"PRAGMA table_info({table});")
        columns_info = sqlite_cur.fetchall()  # (cid, name, type, notnull, dflt_value, pk)

        column_defs = []
        pk_columns = []
        for cid, name, ctype, notnull, dflt_value, pk in columns_info:
            pg_type = convert_type(ctype)

            # 判断是否为自增主键
            if pk and ctype.upper() in ("INTEGER", "INT"):
                col_def = f'"{name}" BIGSERIAL PRIMARY KEY'
            else:
                col_def = f'"{name}" {pg_type}'
                if notnull:
                    col_def += " NOT NULL"
                default_val = convert_default_value(dflt_value, pg_type)
                if default_val:
                    col_def += f" DEFAULT {default_val}"
                if pk:
                    pk_columns.append(f'"{name}"')
            column_defs.append(col_def)

        # 创建表
        if not data_only:
            create_table_sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(column_defs)}'
            if pk_columns:
                create_table_sql += f", PRIMARY KEY ({', '.join(pk_columns)})"
            create_table_sql += ");"
            pg_cur.execute(sql.SQL(create_table_sql))
            pg_conn.commit()

        # 插入数据
        if not schema_only:
            sqlite_cur.execute(f"SELECT * FROM {table};")
            rows = sqlite_cur.fetchall()
            if rows:
                column_names = [c[1] for c in columns_info]
                placeholders = ", ".join(["%s"] * len(column_names))

                # 获取 PG 列类型
                pg_cur.execute(
                    "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s;",
                    (table,)
                )
                col_types = {row[0]: row[1] for row in pg_cur.fetchall()}

                converted_rows = [
                    adapt_row_for_postgres(row, column_names, col_types) for row in rows
                ]

                if if_exists == "skip":
                    insert_sql = sql.SQL(
                        f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders}) '
                        f"ON CONFLICT DO NOTHING"
                    )
                elif if_exists == "update":
                    conflict_target = pk_columns[0] if pk_columns else column_names[0]
                    update_assignments = ", ".join(
                        [f"{col}=EXCLUDED.{col}" for col in column_names if col not in pk_columns]
                    )
                    insert_sql = sql.SQL(
                        f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders}) '
                        f"ON CONFLICT ({conflict_target}) DO UPDATE SET {update_assignments}"
                    )
                else:  # replace
                    pg_cur.execute(f'TRUNCATE TABLE "{table}" RESTART IDENTITY CASCADE;')
                    insert_sql = sql.SQL(
                        f'INSERT INTO "{table}" ({", ".join(column_names)}) VALUES ({placeholders})'
                    )

                pg_cur.executemany(insert_sql, converted_rows)
                pg_conn.commit()

                # 更新自增序列(支持任意主键名,不再假设是 id)
                for cid, name, ctype, notnull, dflt_value, pk in columns_info:
                    if pk and ctype.upper() in ("INTEGER", "INT"):
                        seq_name = f"{table}_{name}_seq"
                        try:
                            pg_cur.execute(
                                sql.SQL(
                                    "SELECT setval(%s, COALESCE(MAX({id}), 1)) FROM {tbl}"
                                ).format(
                                    id=sql.Identifier(name),
                                    tbl=sql.Identifier(table),
                                ),
                                (seq_name,),
                            )
                            pg_conn.commit()
                        except Exception as e:
                            print(f"⚠️ 更新序列失败: 表 {table}, 列 {name}, 错误: {e}")

    sqlite_conn.close()

# ================= 主程序入口 =================

def main():
    parser = argparse.ArgumentParser(description="SQLite → PostgreSQL 数据迁移工具")
    parser.add_argument("--schema-only", action="store_true", help="仅迁移表结构")
    parser.add_argument("--data-only", action="store_true", help="仅迁移表数据")
    parser.add_argument("--tables", nargs="+", help="仅迁移指定表")
    parser.add_argument("--exclude-tables", nargs="+", help="排除指定表")
    parser.add_argument("--if-exists", choices=["skip", "update", "replace"], default="skip",
                        help="遇到冲突时的处理方式")
    args = parser.parse_args()

    print("=== SQLite -> PostgreSQL 数据迁移工具 ===")
    sqlite_db = input("请输入 SQLite 数据库文件路径 (例如: ./memos_prod.db): ").strip()
    pg_db = input("请输入 PostgreSQL 数据库名称: ").strip()
    pg_user = input("请输入 PostgreSQL 用户名: ").strip()
    pg_password = getpass.getpass("请输入 PostgreSQL 密码: ")
    pg_host = input("请输入 PostgreSQL 主机 (默认: localhost): ").strip() or "localhost"
    pg_port = input("请输入 PostgreSQL 端口 (默认: 5432): ").strip() or "5432"

    pg_conn = psycopg2.connect(
        dbname=pg_db, user=pg_user, password=pg_password, host=pg_host, port=pg_port
    )

    migrate(
        sqlite_db,
        pg_conn,
        schema_only=args.schema_only,
        data_only=args.data_only,
        include_tables=args.tables,
        exclude_tables=args.exclude_tables,
        if_exists=args.if_exists,
    )
    pg_conn.close()
    print("=== 数据迁移完成 ===")

if __name__ == "__main__":
    main()

全库迁移(结构 + 数据)

python sqlite_to_postgres.py

只迁移结构

python sqlite_to_postgres.py --schema-only

只迁移数据

python sqlite_to_postgres.py --data-only

只迁移某几个表(例如 userstasks

python sqlite_to_postgres.py --tables users,tasks

排除日志表

python sqlite_to_postgres.py --exclude-tables logs,audit_trail

只迁移 users, tasks,但排除 tasks

python sqlite_to_postgres.py --tables users,tasks --exclude-tables tasks

只迁移数据,遇到冲突时更新

冲突处理模式--if-exists skip|update|replace),默认是skip

python sqlite_to_postgres.py --data-only --if-exists update

后记

以上几个脚本,只有第一个是亲测有效。交互式迁移与加参数迁移都未曾验证,有用到的同学遇到任何问题,欢迎留言交流。