fix(db_models): guard MySQL-specific SQL in migration with DB_TYPE check (fixes #13544) (#13582)

## Summary

Fixes #13544: PostgreSQL startup crash because
`update_tenant_llm_to_id_primary_key()` unconditionally uses
MySQL-specific SQL.

- Split `update_tenant_llm_to_id_primary_key()` into
`_update_tenant_llm_to_id_primary_key_mysql()` and
`_update_tenant_llm_to_id_primary_key_postgres()`, dispatching on
`settings.DATABASE_TYPE`
- MySQL path: unchanged (existing `DATABASE()`, `SET @row = 0`,
`AUTO_INCREMENT`, `DROP PRIMARY KEY` logic)
- PostgreSQL path: uses `current_database()`, `ROW_NUMBER() OVER (ORDER
BY ...)` for sequential IDs, `CREATE SEQUENCE` + `nextval()` for
auto-increment, and `information_schema.table_constraints` to find the
PK constraint name
- Also fix `migrate_add_unique_email()`: MySQL-only
`information_schema.statistics` is replaced with `pg_indexes` on
PostgreSQL

## Test plan

- [ ] Start RAGFlow with `DB_TYPE=postgres` — startup should complete
without `function database() does not exist` error
- [ ] Start RAGFlow with `DB_TYPE=mysql` (default) — existing behaviour
unchanged, migration runs as before
- [ ] Fresh PostgreSQL install: verify `tenant_llm.id` column is created
as a serial primary key after migration
- [ ] Idempotency: running migration twice on PostgreSQL should be a
no-op (column already exists check passes)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: gambletan <gambletan@github>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ethan T.
2026-03-13 11:53:01 +08:00
committed by GitHub
parent e90f0e8910
commit 71804bf5bc

View File

@ -1370,14 +1370,22 @@ def migrate_add_unique_email(migrator):
"""Deduplicates user emails and add UNIQUE constraint to email column (idempotent)"""
# step 0: check if UNIQUE index on email already exists
try:
cursor = DB.execute_sql("""
SELECT COUNT(*)
FROM information_schema.statistics
WHERE table_schema = DATABASE()
AND table_name = 'user'
AND index_name = 'user_email'
AND non_unique = 0
""")
if settings.DATABASE_TYPE.upper() == "POSTGRES":
cursor = DB.execute_sql("""
SELECT COUNT(*)
FROM pg_indexes
WHERE tablename = 'user'
AND indexname = 'user_email'
""")
else:
cursor = DB.execute_sql("""
SELECT COUNT(*)
FROM information_schema.statistics
WHERE table_schema = DATABASE()
AND table_name = 'user'
AND index_name = 'user_email'
AND non_unique = 0
""")
result = cursor.fetchone()
if result and result[0] > 0:
logging.info("UNIQUE index on user.email already exists, skipping migration")
@ -1422,14 +1430,22 @@ def migrate_add_unique_email(migrator):
def update_tenant_llm_to_id_primary_key():
"""Add ID and set to primary key step by step."""
if settings.DATABASE_TYPE.upper() == "POSTGRES":
_update_tenant_llm_to_id_primary_key_postgres()
else:
_update_tenant_llm_to_id_primary_key_mysql()
def _update_tenant_llm_to_id_primary_key_mysql():
"""MySQL implementation: Add ID column and set as AUTO_INCREMENT primary key."""
try:
with DB.atomic():
# 0. Check if exist ID
# 0. Check if 'id' column already exists
cursor = DB.execute_sql("""
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'tenant_llm'
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'tenant_llm'
AND COLUMN_NAME = 'id'
""")
if cursor.rowcount > 0:
@ -1438,22 +1454,22 @@ def update_tenant_llm_to_id_primary_key():
# 1. Add nullable column
DB.execute_sql("ALTER TABLE tenant_llm ADD COLUMN temp_id INT NULL")
# 2. Set ID
# 2. Set ID using MySQL user variables
DB.execute_sql("SET @row = 0;")
DB.execute_sql("UPDATE tenant_llm SET temp_id = (@row := @row + 1) ORDER BY tenant_id, llm_factory, llm_name;")
# 3. Drop old primary key
DB.execute_sql("ALTER TABLE tenant_llm DROP PRIMARY KEY")
# 4. Update ID column to primary key
# 4. Update ID column to primary key with AUTO_INCREMENT
DB.execute_sql("""
ALTER TABLE tenant_llm
ALTER TABLE tenant_llm
MODIFY COLUMN temp_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY
""")
# 5. Add unique key
DB.execute_sql("""
ALTER TABLE tenant_llm
ALTER TABLE tenant_llm
ADD CONSTRAINT uk_tenant_llm UNIQUE (tenant_id, llm_factory, llm_name)
""")
@ -1465,16 +1481,92 @@ def update_tenant_llm_to_id_primary_key():
except Exception as e:
logging.error(str(e))
cursor = DB.execute_sql("""
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'tenant_llm'
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'tenant_llm'
AND COLUMN_NAME = 'temp_id'
""")
if cursor.rowcount > 0:
DB.execute_sql("ALTER TABLE tenant_llm DROP COLUMN temp_id")
def _update_tenant_llm_to_id_primary_key_postgres():
"""PostgreSQL implementation: Add SERIAL primary key column to tenant_llm."""
try:
with DB.atomic():
# 0. Check if 'id' column already exists
cursor = DB.execute_sql("""
SELECT column_name
FROM information_schema.columns
WHERE table_catalog = current_database()
AND table_name = 'tenant_llm'
AND column_name = 'id'
""")
if cursor.rowcount > 0:
return
# 1. Add nullable integer column
DB.execute_sql("ALTER TABLE tenant_llm ADD COLUMN temp_id INTEGER NULL")
# 2. Assign sequential row numbers ordered consistently
DB.execute_sql("""
UPDATE tenant_llm
SET temp_id = subq.rn
FROM (
SELECT ctid,
ROW_NUMBER() OVER (ORDER BY tenant_id, llm_factory, llm_name) AS rn
FROM tenant_llm
) AS subq
WHERE tenant_llm.ctid = subq.ctid
""")
# 3. Drop old composite primary key constraint
cursor = DB.execute_sql("""
SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_catalog = current_database()
AND table_name = 'tenant_llm'
AND constraint_type = 'PRIMARY KEY'
""")
row = cursor.fetchone()
if row:
DB.execute_sql(f'ALTER TABLE tenant_llm DROP CONSTRAINT "{row[0]}"')
# 4. Make temp_id NOT NULL and create a sequence for it
DB.execute_sql("ALTER TABLE tenant_llm ALTER COLUMN temp_id SET NOT NULL")
DB.execute_sql("CREATE SEQUENCE IF NOT EXISTS tenant_llm_id_seq")
DB.execute_sql("""
SELECT setval('tenant_llm_id_seq', COALESCE((SELECT MAX(temp_id) FROM tenant_llm), 0))
""")
DB.execute_sql("ALTER TABLE tenant_llm ALTER COLUMN temp_id SET DEFAULT nextval('tenant_llm_id_seq')")
DB.execute_sql("ALTER SEQUENCE tenant_llm_id_seq OWNED BY tenant_llm.temp_id")
DB.execute_sql("ALTER TABLE tenant_llm ADD PRIMARY KEY (temp_id)")
# 5. Add unique constraint
DB.execute_sql("""
ALTER TABLE tenant_llm
ADD CONSTRAINT uk_tenant_llm UNIQUE (tenant_id, llm_factory, llm_name)
""")
# 6. Rename temp_id to id
DB.execute_sql("ALTER TABLE tenant_llm RENAME COLUMN temp_id TO id")
logging.info("Successfully updated tenant_llm to id primary key (PostgreSQL).")
except Exception as e:
logging.error(str(e))
cursor = DB.execute_sql("""
SELECT column_name
FROM information_schema.columns
WHERE table_catalog = current_database()
AND table_name = 'tenant_llm'
AND column_name = 'temp_id'
""")
if cursor.rowcount > 0:
DB.execute_sql("ALTER TABLE tenant_llm DROP COLUMN temp_id")
def migrate_db():
logging.disable(logging.ERROR)
migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB)