feat:mysql adaptation for meta db (#50)

* phase1:uuid stringdefault timestamp date

* phase2:json adapt

* phase3:BinaryData,ARRAY,Text,

* fix sa.String()

* fix defalut valud

* models adapt

* models adapt supplement

* fix pg unique dialect

* supplement

* migration fix

* migration fix

* date sql adapt

* model adapt fix

* config adapt

* migration fix

* fix

* fix anotation in .env.example

* config feat

* config fix

* .env.example adapt fix
This commit is contained in:
longbingljw
2025-11-07 15:35:31 +08:00
committed by GitHub
parent 829796514a
commit fcc71124b1
125 changed files with 6096 additions and 2427 deletions

View File

@ -7,7 +7,8 @@ from enum import StrEnum
from typing import Any, ClassVar
from sqlalchemy import Engine, orm, select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.sql.expression import and_, or_
@ -627,28 +628,51 @@ def _batch_upsert_draft_variable(
#
# For these reasons, we use the SQLAlchemy query builder and rely on dialect-specific
# insert operations instead of the ORM layer.
stmt = insert(WorkflowDraftVariable).values([_model_to_insertion_dict(v) for v in draft_vars])
if policy == _UpsertPolicy.OVERWRITE:
stmt = stmt.on_conflict_do_update(
index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(),
set_={
# Use different insert statements based on database type
if dify_config.SQLALCHEMY_DATABASE_URI_SCHEME == "postgresql":
stmt = pg_insert(WorkflowDraftVariable).values([_model_to_insertion_dict(v) for v in draft_vars])
if policy == _UpsertPolicy.OVERWRITE:
stmt = stmt.on_conflict_do_update(
index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(),
set_={
# Refresh creation timestamp to ensure updated variables
# appear first in chronologically sorted result sets.
"created_at": stmt.excluded.created_at,
"updated_at": stmt.excluded.updated_at,
"last_edited_at": stmt.excluded.last_edited_at,
"description": stmt.excluded.description,
"value_type": stmt.excluded.value_type,
"value": stmt.excluded.value,
"visible": stmt.excluded.visible,
"editable": stmt.excluded.editable,
"node_execution_id": stmt.excluded.node_execution_id,
"file_id": stmt.excluded.file_id,
},
)
elif policy == _UpsertPolicy.IGNORE:
stmt = stmt.on_conflict_do_nothing(index_elements=WorkflowDraftVariable.unique_app_id_node_id_name())
else:
stmt = mysql_insert(WorkflowDraftVariable).values([_model_to_insertion_dict(v) for v in draft_vars])
if policy == _UpsertPolicy.OVERWRITE:
stmt = stmt.on_duplicate_key_update(
# Refresh creation timestamp to ensure updated variables
# appear first in chronologically sorted result sets.
"created_at": stmt.excluded.created_at,
"updated_at": stmt.excluded.updated_at,
"last_edited_at": stmt.excluded.last_edited_at,
"description": stmt.excluded.description,
"value_type": stmt.excluded.value_type,
"value": stmt.excluded.value,
"visible": stmt.excluded.visible,
"editable": stmt.excluded.editable,
"node_execution_id": stmt.excluded.node_execution_id,
"file_id": stmt.excluded.file_id,
},
)
elif policy == _UpsertPolicy.IGNORE:
stmt = stmt.on_conflict_do_nothing(index_elements=WorkflowDraftVariable.unique_app_id_node_id_name())
else:
created_at=stmt.inserted.created_at,
updated_at=stmt.inserted.updated_at,
last_edited_at=stmt.inserted.last_edited_at,
description=stmt.inserted.description,
value_type=stmt.inserted.value_type,
value=stmt.inserted.value,
visible=stmt.inserted.visible,
editable=stmt.inserted.editable,
node_execution_id=stmt.inserted.node_execution_id,
file_id=stmt.inserted.file_id,
)
elif policy == _UpsertPolicy.IGNORE:
stmt = stmt.prefix_with("IGNORE")
if policy not in [_UpsertPolicy.OVERWRITE, _UpsertPolicy.IGNORE]:
raise Exception("Invalid value for update policy.")
session.execute(stmt)