From 71804bf5bc6db0f289cee1459d550340f86ff3ff Mon Sep 17 00:00:00 2001 From: "Ethan T." Date: Fri, 13 Mar 2026 11:53:01 +0800 Subject: [PATCH] fix(db_models): guard MySQL-specific SQL in migration with DB_TYPE check (fixes #13544) (#13582) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Fixes #13544: PostgreSQL startup crash because `update_tenant_llm_to_id_primary_key()` unconditionally uses MySQL-specific SQL. - Split `update_tenant_llm_to_id_primary_key()` into `_update_tenant_llm_to_id_primary_key_mysql()` and `_update_tenant_llm_to_id_primary_key_postgres()`, dispatching on `settings.DATABASE_TYPE` - MySQL path: unchanged (existing `DATABASE()`, `SET @row = 0`, `AUTO_INCREMENT`, `DROP PRIMARY KEY` logic) - PostgreSQL path: uses `current_database()`, `ROW_NUMBER() OVER (ORDER BY ...)` for sequential IDs, `CREATE SEQUENCE` + `nextval()` for auto-increment, and `information_schema.table_constraints` to find the PK constraint name - Also fix `migrate_add_unique_email()`: MySQL-only `information_schema.statistics` is replaced with `pg_indexes` on PostgreSQL ## Test plan - [ ] Start RAGFlow with `DB_TYPE=postgres` — startup should complete without `function database() does not exist` error - [ ] Start RAGFlow with `DB_TYPE=mysql` (default) — existing behaviour unchanged, migration runs as before - [ ] Fresh PostgreSQL install: verify `tenant_llm.id` column is created as a serial primary key after migration - [ ] Idempotency: running migration twice on PostgreSQL should be a no-op (column already exists check passes) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: gambletan Co-authored-by: Claude Sonnet 4.6 --- api/db/db_models.py | 134 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 113 insertions(+), 21 deletions(-) diff --git a/api/db/db_models.py b/api/db/db_models.py index 2e3824050..bf265f585 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -1370,14 +1370,22 @@ def migrate_add_unique_email(migrator): """Deduplicates user emails and add UNIQUE constraint to email column (idempotent)""" # step 0: check if UNIQUE index on email already exists try: - cursor = DB.execute_sql(""" - SELECT COUNT(*) - FROM information_schema.statistics - WHERE table_schema = DATABASE() - AND table_name = 'user' - AND index_name = 'user_email' - AND non_unique = 0 - """) + if settings.DATABASE_TYPE.upper() == "POSTGRES": + cursor = DB.execute_sql(""" + SELECT COUNT(*) + FROM pg_indexes + WHERE tablename = 'user' + AND indexname = 'user_email' + """) + else: + cursor = DB.execute_sql(""" + SELECT COUNT(*) + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = 'user' + AND index_name = 'user_email' + AND non_unique = 0 + """) result = cursor.fetchone() if result and result[0] > 0: logging.info("UNIQUE index on user.email already exists, skipping migration") @@ -1422,14 +1430,22 @@ def migrate_add_unique_email(migrator): def update_tenant_llm_to_id_primary_key(): """Add ID and set to primary key step by step.""" + if settings.DATABASE_TYPE.upper() == "POSTGRES": + _update_tenant_llm_to_id_primary_key_postgres() + else: + _update_tenant_llm_to_id_primary_key_mysql() + + +def _update_tenant_llm_to_id_primary_key_mysql(): + """MySQL implementation: Add ID column and set as AUTO_INCREMENT primary key.""" try: with DB.atomic(): - # 0. Check if exist ID + # 0. Check if 'id' column already exists cursor = DB.execute_sql(""" - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = DATABASE() - AND TABLE_NAME = 'tenant_llm' + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'tenant_llm' AND COLUMN_NAME = 'id' """) if cursor.rowcount > 0: @@ -1438,22 +1454,22 @@ def update_tenant_llm_to_id_primary_key(): # 1. Add nullable column DB.execute_sql("ALTER TABLE tenant_llm ADD COLUMN temp_id INT NULL") - # 2. Set ID + # 2. Set ID using MySQL user variables DB.execute_sql("SET @row = 0;") DB.execute_sql("UPDATE tenant_llm SET temp_id = (@row := @row + 1) ORDER BY tenant_id, llm_factory, llm_name;") # 3. Drop old primary key DB.execute_sql("ALTER TABLE tenant_llm DROP PRIMARY KEY") - # 4. Update ID column to primary key + # 4. Update ID column to primary key with AUTO_INCREMENT DB.execute_sql(""" - ALTER TABLE tenant_llm + ALTER TABLE tenant_llm MODIFY COLUMN temp_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY """) # 5. Add unique key DB.execute_sql(""" - ALTER TABLE tenant_llm + ALTER TABLE tenant_llm ADD CONSTRAINT uk_tenant_llm UNIQUE (tenant_id, llm_factory, llm_name) """) @@ -1465,16 +1481,92 @@ def update_tenant_llm_to_id_primary_key(): except Exception as e: logging.error(str(e)) cursor = DB.execute_sql(""" - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = DATABASE() - AND TABLE_NAME = 'tenant_llm' + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'tenant_llm' AND COLUMN_NAME = 'temp_id' """) if cursor.rowcount > 0: DB.execute_sql("ALTER TABLE tenant_llm DROP COLUMN temp_id") +def _update_tenant_llm_to_id_primary_key_postgres(): + """PostgreSQL implementation: Add SERIAL primary key column to tenant_llm.""" + try: + with DB.atomic(): + # 0. Check if 'id' column already exists + cursor = DB.execute_sql(""" + SELECT column_name + FROM information_schema.columns + WHERE table_catalog = current_database() + AND table_name = 'tenant_llm' + AND column_name = 'id' + """) + if cursor.rowcount > 0: + return + + # 1. Add nullable integer column + DB.execute_sql("ALTER TABLE tenant_llm ADD COLUMN temp_id INTEGER NULL") + + # 2. Assign sequential row numbers ordered consistently + DB.execute_sql(""" + UPDATE tenant_llm + SET temp_id = subq.rn + FROM ( + SELECT ctid, + ROW_NUMBER() OVER (ORDER BY tenant_id, llm_factory, llm_name) AS rn + FROM tenant_llm + ) AS subq + WHERE tenant_llm.ctid = subq.ctid + """) + + # 3. Drop old composite primary key constraint + cursor = DB.execute_sql(""" + SELECT constraint_name + FROM information_schema.table_constraints + WHERE table_catalog = current_database() + AND table_name = 'tenant_llm' + AND constraint_type = 'PRIMARY KEY' + """) + row = cursor.fetchone() + if row: + DB.execute_sql(f'ALTER TABLE tenant_llm DROP CONSTRAINT "{row[0]}"') + + # 4. Make temp_id NOT NULL and create a sequence for it + DB.execute_sql("ALTER TABLE tenant_llm ALTER COLUMN temp_id SET NOT NULL") + DB.execute_sql("CREATE SEQUENCE IF NOT EXISTS tenant_llm_id_seq") + DB.execute_sql(""" + SELECT setval('tenant_llm_id_seq', COALESCE((SELECT MAX(temp_id) FROM tenant_llm), 0)) + """) + DB.execute_sql("ALTER TABLE tenant_llm ALTER COLUMN temp_id SET DEFAULT nextval('tenant_llm_id_seq')") + DB.execute_sql("ALTER SEQUENCE tenant_llm_id_seq OWNED BY tenant_llm.temp_id") + DB.execute_sql("ALTER TABLE tenant_llm ADD PRIMARY KEY (temp_id)") + + # 5. Add unique constraint + DB.execute_sql(""" + ALTER TABLE tenant_llm + ADD CONSTRAINT uk_tenant_llm UNIQUE (tenant_id, llm_factory, llm_name) + """) + + # 6. Rename temp_id to id + DB.execute_sql("ALTER TABLE tenant_llm RENAME COLUMN temp_id TO id") + + logging.info("Successfully updated tenant_llm to id primary key (PostgreSQL).") + + except Exception as e: + logging.error(str(e)) + cursor = DB.execute_sql(""" + SELECT column_name + FROM information_schema.columns + WHERE table_catalog = current_database() + AND table_name = 'tenant_llm' + AND column_name = 'temp_id' + """) + if cursor.rowcount > 0: + DB.execute_sql("ALTER TABLE tenant_llm DROP COLUMN temp_id") + + def migrate_db(): logging.disable(logging.ERROR) migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB)