From 98e72521f424f490c1f148ea5aae147a0a4eb6bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9D=9E=E6=B3=95=E6=93=8D=E4=BD=9C?= Date: Mon, 16 Mar 2026 14:04:41 +0800 Subject: [PATCH] chore: change draft var to user scoped (#33066) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: QuantumGhost --- .../console/app/workflow_draft_variable.py | 73 +++++++---- .../rag_pipeline_draft_variable.py | 13 +- .../app/apps/advanced_chat/app_generator.py | 6 +- .../app/apps/pipeline/pipeline_generator.py | 6 +- api/core/app/apps/workflow/app_generator.py | 6 +- ...add_user_id_to_workflow_draft_variables.py | 69 ++++++++++ api/models/workflow.py | 18 ++- api/services/app_dsl_service.py | 2 +- api/services/rag_pipeline/rag_pipeline.py | 2 + .../workflow_draft_variable_service.py | 120 ++++++++++++------ api/services/workflow_service.py | 10 +- .../test_workflow_draft_variable_service.py | 69 +++++++--- .../test_workflow_draft_variable_service.py | 116 ++++++++++++++--- .../controllers/console/app/test_app_apis.py | 1 + .../apps/advanced_chat/test_app_generator.py | 18 +-- .../services/test_app_dsl_service.py | 4 +- .../workflow/test_draft_var_loader_simple.py | 10 +- .../test_workflow_draft_variable_service.py | 38 +++++- .../workflow/test_workflow_service.py | 1 + 19 files changed, 452 insertions(+), 130 deletions(-) create mode 100644 api/migrations/versions/2026_03_04_1600-6b5f9f8b1a2c_add_user_id_to_workflow_draft_variables.py diff --git a/api/controllers/console/app/workflow_draft_variable.py b/api/controllers/console/app/workflow_draft_variable.py index 165bfcd4ba..b78d97a382 100644 --- a/api/controllers/console/app/workflow_draft_variable.py +++ b/api/controllers/console/app/workflow_draft_variable.py @@ -23,7 +23,7 @@ from dify_graph.variables.types import SegmentType from extensions.ext_database import db from factories.file_factory import build_from_mapping, build_from_mappings from factories.variable_factory import build_segment_with_type -from libs.login import login_required +from libs.login import current_user, login_required from models import App, AppMode from models.workflow import WorkflowDraftVariable from services.workflow_draft_variable_service import WorkflowDraftVariableList, WorkflowDraftVariableService @@ -100,6 +100,18 @@ def _serialize_full_content(variable: WorkflowDraftVariable) -> dict | None: } +def _ensure_variable_access( + variable: WorkflowDraftVariable | None, + app_id: str, + variable_id: str, +) -> WorkflowDraftVariable: + if variable is None: + raise NotFoundError(description=f"variable not found, id={variable_id}") + if variable.app_id != app_id or variable.user_id != current_user.id: + raise NotFoundError(description=f"variable not found, id={variable_id}") + return variable + + _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = { "id": fields.String, "type": fields.String(attribute=lambda model: model.get_variable_type()), @@ -238,6 +250,7 @@ class WorkflowVariableCollectionApi(Resource): app_id=app_model.id, page=args.page, limit=args.limit, + user_id=current_user.id, ) return workflow_vars @@ -250,7 +263,7 @@ class WorkflowVariableCollectionApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) - draft_var_srv.delete_workflow_variables(app_model.id) + draft_var_srv.delete_user_workflow_variables(app_model.id, user_id=current_user.id) db.session.commit() return Response("", 204) @@ -287,7 +300,7 @@ class NodeVariableCollectionApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=session, ) - node_vars = draft_var_srv.list_node_variables(app_model.id, node_id) + node_vars = draft_var_srv.list_node_variables(app_model.id, node_id, user_id=current_user.id) return node_vars @@ -298,7 +311,7 @@ class NodeVariableCollectionApi(Resource): def delete(self, app_model: App, node_id: str): validate_node_id(node_id) srv = WorkflowDraftVariableService(db.session()) - srv.delete_node_variables(app_model.id, node_id) + srv.delete_node_variables(app_model.id, node_id, user_id=current_user.id) db.session.commit() return Response("", 204) @@ -319,11 +332,11 @@ class VariableApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) - variable = draft_var_srv.get_variable(variable_id=variable_id) - if variable is None: - raise NotFoundError(description=f"variable not found, id={variable_id}") - if variable.app_id != app_model.id: - raise NotFoundError(description=f"variable not found, id={variable_id}") + variable = _ensure_variable_access( + variable=draft_var_srv.get_variable(variable_id=variable_id), + app_id=app_model.id, + variable_id=variable_id, + ) return variable @console_ns.doc("update_variable") @@ -360,11 +373,11 @@ class VariableApi(Resource): ) args_model = WorkflowDraftVariableUpdatePayload.model_validate(console_ns.payload or {}) - variable = draft_var_srv.get_variable(variable_id=variable_id) - if variable is None: - raise NotFoundError(description=f"variable not found, id={variable_id}") - if variable.app_id != app_model.id: - raise NotFoundError(description=f"variable not found, id={variable_id}") + variable = _ensure_variable_access( + variable=draft_var_srv.get_variable(variable_id=variable_id), + app_id=app_model.id, + variable_id=variable_id, + ) new_name = args_model.name raw_value = args_model.value @@ -397,11 +410,11 @@ class VariableApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) - variable = draft_var_srv.get_variable(variable_id=variable_id) - if variable is None: - raise NotFoundError(description=f"variable not found, id={variable_id}") - if variable.app_id != app_model.id: - raise NotFoundError(description=f"variable not found, id={variable_id}") + variable = _ensure_variable_access( + variable=draft_var_srv.get_variable(variable_id=variable_id), + app_id=app_model.id, + variable_id=variable_id, + ) draft_var_srv.delete_variable(variable) db.session.commit() return Response("", 204) @@ -427,11 +440,11 @@ class VariableResetApi(Resource): raise NotFoundError( f"Draft workflow not found, app_id={app_model.id}", ) - variable = draft_var_srv.get_variable(variable_id=variable_id) - if variable is None: - raise NotFoundError(description=f"variable not found, id={variable_id}") - if variable.app_id != app_model.id: - raise NotFoundError(description=f"variable not found, id={variable_id}") + variable = _ensure_variable_access( + variable=draft_var_srv.get_variable(variable_id=variable_id), + app_id=app_model.id, + variable_id=variable_id, + ) resetted = draft_var_srv.reset_variable(draft_workflow, variable) db.session.commit() @@ -447,11 +460,15 @@ def _get_variable_list(app_model: App, node_id) -> WorkflowDraftVariableList: session=session, ) if node_id == CONVERSATION_VARIABLE_NODE_ID: - draft_vars = draft_var_srv.list_conversation_variables(app_model.id) + draft_vars = draft_var_srv.list_conversation_variables(app_model.id, user_id=current_user.id) elif node_id == SYSTEM_VARIABLE_NODE_ID: - draft_vars = draft_var_srv.list_system_variables(app_model.id) + draft_vars = draft_var_srv.list_system_variables(app_model.id, user_id=current_user.id) else: - draft_vars = draft_var_srv.list_node_variables(app_id=app_model.id, node_id=node_id) + draft_vars = draft_var_srv.list_node_variables( + app_id=app_model.id, + node_id=node_id, + user_id=current_user.id, + ) return draft_vars @@ -472,7 +489,7 @@ class ConversationVariableCollectionApi(Resource): if draft_workflow is None: raise NotFoundError(description=f"draft workflow not found, id={app_model.id}") draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(draft_workflow) + draft_var_srv.prefill_conversation_variable_default_values(draft_workflow, user_id=current_user.id) db.session.commit() return _get_variable_list(app_model, CONVERSATION_VARIABLE_NODE_ID) diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py index 4c441a5d07..c5dadb75f5 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_draft_variable.py @@ -102,6 +102,7 @@ class RagPipelineVariableCollectionApi(Resource): app_id=pipeline.id, page=query.page, limit=query.limit, + user_id=current_user.id, ) return workflow_vars @@ -111,7 +112,7 @@ class RagPipelineVariableCollectionApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=db.session(), ) - draft_var_srv.delete_workflow_variables(pipeline.id) + draft_var_srv.delete_user_workflow_variables(pipeline.id, user_id=current_user.id) db.session.commit() return Response("", 204) @@ -144,7 +145,7 @@ class RagPipelineNodeVariableCollectionApi(Resource): draft_var_srv = WorkflowDraftVariableService( session=session, ) - node_vars = draft_var_srv.list_node_variables(pipeline.id, node_id) + node_vars = draft_var_srv.list_node_variables(pipeline.id, node_id, user_id=current_user.id) return node_vars @@ -152,7 +153,7 @@ class RagPipelineNodeVariableCollectionApi(Resource): def delete(self, pipeline: Pipeline, node_id: str): validate_node_id(node_id) srv = WorkflowDraftVariableService(db.session()) - srv.delete_node_variables(pipeline.id, node_id) + srv.delete_node_variables(pipeline.id, node_id, user_id=current_user.id) db.session.commit() return Response("", 204) @@ -283,11 +284,11 @@ def _get_variable_list(pipeline: Pipeline, node_id) -> WorkflowDraftVariableList session=session, ) if node_id == CONVERSATION_VARIABLE_NODE_ID: - draft_vars = draft_var_srv.list_conversation_variables(pipeline.id) + draft_vars = draft_var_srv.list_conversation_variables(pipeline.id, user_id=current_user.id) elif node_id == SYSTEM_VARIABLE_NODE_ID: - draft_vars = draft_var_srv.list_system_variables(pipeline.id) + draft_vars = draft_var_srv.list_system_variables(pipeline.id, user_id=current_user.id) else: - draft_vars = draft_var_srv.list_node_variables(app_id=pipeline.id, node_id=node_id) + draft_vars = draft_var_srv.list_node_variables(app_id=pipeline.id, node_id=node_id, user_id=current_user.id) return draft_vars diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 05ae1a4d38..5d974335ff 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -330,9 +330,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) return self._generate( workflow=workflow, @@ -413,9 +414,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) return self._generate( workflow=workflow, diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index dcfc1415e8..19d67eb108 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -419,11 +419,12 @@ class PipelineGenerator(BaseAppGenerator): triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) var_loader = DraftVarLoader( engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) return self._generate( @@ -514,11 +515,12 @@ class PipelineGenerator(BaseAppGenerator): triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) var_loader = DraftVarLoader( engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) return self._generate( diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 32a7a3ccec..6fbe19a3b2 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -414,11 +414,12 @@ class WorkflowAppGenerator(BaseAppGenerator): triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) var_loader = DraftVarLoader( engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) return self._generate( @@ -497,11 +498,12 @@ class WorkflowAppGenerator(BaseAppGenerator): triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, ) draft_var_srv = WorkflowDraftVariableService(db.session()) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user.id) var_loader = DraftVarLoader( engine=db.engine, app_id=application_generate_entity.app_config.app_id, tenant_id=application_generate_entity.app_config.tenant_id, + user_id=user.id, ) return self._generate( app_model=app_model, diff --git a/api/migrations/versions/2026_03_04_1600-6b5f9f8b1a2c_add_user_id_to_workflow_draft_variables.py b/api/migrations/versions/2026_03_04_1600-6b5f9f8b1a2c_add_user_id_to_workflow_draft_variables.py new file mode 100644 index 0000000000..432e4dadf5 --- /dev/null +++ b/api/migrations/versions/2026_03_04_1600-6b5f9f8b1a2c_add_user_id_to_workflow_draft_variables.py @@ -0,0 +1,69 @@ +"""add user_id and switch workflow_draft_variables unique key to user scope + +Revision ID: 6b5f9f8b1a2c +Revises: 0ec65df55790 +Create Date: 2026-03-04 16:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +import models as models + +# revision identifiers, used by Alembic. +revision = "6b5f9f8b1a2c" +down_revision = "0ec65df55790" +branch_labels = None +depends_on = None + + +def _is_pg(conn) -> bool: + return conn.dialect.name == "postgresql" + + +def upgrade(): + conn = op.get_bind() + table_name = "workflow_draft_variables" + + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.add_column(sa.Column("user_id", models.types.StringUUID(), nullable=True)) + + if _is_pg(conn): + with op.get_context().autocommit_block(): + op.create_index( + "workflow_draft_variables_app_id_user_id_key", + "workflow_draft_variables", + ["app_id", "user_id", "node_id", "name"], + unique=True, + postgresql_concurrently=True, + ) + else: + op.create_index( + "workflow_draft_variables_app_id_user_id_key", + "workflow_draft_variables", + ["app_id", "user_id", "node_id", "name"], + unique=True, + ) + + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.drop_constraint(op.f("workflow_draft_variables_app_id_key"), type_="unique") + + +def downgrade(): + conn = op.get_bind() + + with op.batch_alter_table("workflow_draft_variables", schema=None) as batch_op: + batch_op.create_unique_constraint( + op.f("workflow_draft_variables_app_id_key"), + ["app_id", "node_id", "name"], + ) + + if _is_pg(conn): + with op.get_context().autocommit_block(): + op.drop_index("workflow_draft_variables_app_id_user_id_key", postgresql_concurrently=True) + else: + op.drop_index("workflow_draft_variables_app_id_user_id_key", table_name="workflow_draft_variables") + + with op.batch_alter_table("workflow_draft_variables", schema=None) as batch_op: + batch_op.drop_column("user_id") diff --git a/api/models/workflow.py b/api/models/workflow.py index fdb8de0653..32cbd50648 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1286,16 +1286,17 @@ class WorkflowDraftVariable(Base): """ @staticmethod - def unique_app_id_node_id_name() -> list[str]: + def unique_app_id_user_id_node_id_name() -> list[str]: return [ "app_id", + "user_id", "node_id", "name", ] __tablename__ = "workflow_draft_variables" __table_args__ = ( - UniqueConstraint(*unique_app_id_node_id_name()), + UniqueConstraint(*unique_app_id_user_id_node_id_name()), Index("workflow_draft_variable_file_id_idx", "file_id"), ) # Required for instance variable annotation. @@ -1321,6 +1322,11 @@ class WorkflowDraftVariable(Base): # "`app_id` maps to the `id` field in the `model.App` model." app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + # Owner of this draft variable. + # + # This field is nullable during migration and will be migrated to NOT NULL + # in a follow-up release. + user_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None) # `last_edited_at` records when the value of a given draft variable # is edited. @@ -1573,6 +1579,7 @@ class WorkflowDraftVariable(Base): cls, *, app_id: str, + user_id: str | None, node_id: str, name: str, value: Segment, @@ -1586,6 +1593,7 @@ class WorkflowDraftVariable(Base): variable.updated_at = naive_utc_now() variable.description = description variable.app_id = app_id + variable.user_id = user_id variable.node_id = node_id variable.name = name variable.set_value(value) @@ -1599,12 +1607,14 @@ class WorkflowDraftVariable(Base): cls, *, app_id: str, + user_id: str | None = None, name: str, value: Segment, description: str = "", ) -> "WorkflowDraftVariable": variable = cls._new( app_id=app_id, + user_id=user_id, node_id=CONVERSATION_VARIABLE_NODE_ID, name=name, value=value, @@ -1619,6 +1629,7 @@ class WorkflowDraftVariable(Base): cls, *, app_id: str, + user_id: str | None = None, name: str, value: Segment, node_execution_id: str, @@ -1626,6 +1637,7 @@ class WorkflowDraftVariable(Base): ) -> "WorkflowDraftVariable": variable = cls._new( app_id=app_id, + user_id=user_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, node_execution_id=node_execution_id, @@ -1639,6 +1651,7 @@ class WorkflowDraftVariable(Base): cls, *, app_id: str, + user_id: str | None = None, node_id: str, name: str, value: Segment, @@ -1649,6 +1662,7 @@ class WorkflowDraftVariable(Base): ) -> "WorkflowDraftVariable": variable = cls._new( app_id=app_id, + user_id=user_id, node_id=node_id, name=name, node_execution_id=node_execution_id, diff --git a/api/services/app_dsl_service.py b/api/services/app_dsl_service.py index 43bf6374b1..68cb3438ca 100644 --- a/api/services/app_dsl_service.py +++ b/api/services/app_dsl_service.py @@ -304,7 +304,7 @@ class AppDslService: ) draft_var_srv = WorkflowDraftVariableService(session=self._session) - draft_var_srv.delete_workflow_variables(app_id=app.id) + draft_var_srv.delete_app_workflow_variables(app_id=app.id) return Import( id=import_id, status=status, diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 899a6ba378..2118043a98 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -472,6 +472,7 @@ class RagPipelineService: engine=db.engine, app_id=pipeline.id, tenant_id=pipeline.tenant_id, + user_id=account.id, ), ), start_at=start_at, @@ -1237,6 +1238,7 @@ class RagPipelineService: engine=db.engine, app_id=pipeline.id, tenant_id=pipeline.tenant_id, + user_id=current_user.id, ), ), start_at=start_at, diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 804bf28b66..fb1a3f30c0 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -77,6 +77,7 @@ class DraftVarLoader(VariableLoader): _engine: Engine # Application ID for which variables are being loaded. _app_id: str + _user_id: str _tenant_id: str _fallback_variables: Sequence[VariableBase] @@ -85,10 +86,12 @@ class DraftVarLoader(VariableLoader): engine: Engine, app_id: str, tenant_id: str, + user_id: str, fallback_variables: Sequence[VariableBase] | None = None, ): self._engine = engine self._app_id = app_id + self._user_id = user_id self._tenant_id = tenant_id self._fallback_variables = fallback_variables or [] @@ -104,7 +107,7 @@ class DraftVarLoader(VariableLoader): with Session(bind=self._engine, expire_on_commit=False) as session: srv = WorkflowDraftVariableService(session) - draft_vars = srv.get_draft_variables_by_selectors(self._app_id, selectors) + draft_vars = srv.get_draft_variables_by_selectors(self._app_id, selectors, user_id=self._user_id) # Important: files: list[File] = [] @@ -218,6 +221,7 @@ class WorkflowDraftVariableService: self, app_id: str, selectors: Sequence[list[str]], + user_id: str, ) -> list[WorkflowDraftVariable]: """ Retrieve WorkflowDraftVariable instances based on app_id and selectors. @@ -238,22 +242,30 @@ class WorkflowDraftVariableService: # Alternatively, a `SELECT` statement could be constructed for each selector and # combined using `UNION` to fetch all rows. # Benchmarking indicates that both approaches yield comparable performance. - variables = ( + query = ( self._session.query(WorkflowDraftVariable) .options( orm.selectinload(WorkflowDraftVariable.variable_file).selectinload( WorkflowDraftVariableFile.upload_file ) ) - .where(WorkflowDraftVariable.app_id == app_id, or_(*ors)) - .all() + .where( + WorkflowDraftVariable.app_id == app_id, + WorkflowDraftVariable.user_id == user_id, + or_(*ors), + ) ) - return variables + return query.all() - def list_variables_without_values(self, app_id: str, page: int, limit: int) -> WorkflowDraftVariableList: - criteria = WorkflowDraftVariable.app_id == app_id + def list_variables_without_values( + self, app_id: str, page: int, limit: int, user_id: str + ) -> WorkflowDraftVariableList: + criteria = [ + WorkflowDraftVariable.app_id == app_id, + WorkflowDraftVariable.user_id == user_id, + ] total = None - query = self._session.query(WorkflowDraftVariable).where(criteria) + query = self._session.query(WorkflowDraftVariable).where(*criteria) if page == 1: total = query.count() variables = ( @@ -269,11 +281,12 @@ class WorkflowDraftVariableService: return WorkflowDraftVariableList(variables=variables, total=total) - def _list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: - criteria = ( + def _list_node_variables(self, app_id: str, node_id: str, user_id: str) -> WorkflowDraftVariableList: + criteria = [ WorkflowDraftVariable.app_id == app_id, WorkflowDraftVariable.node_id == node_id, - ) + WorkflowDraftVariable.user_id == user_id, + ] query = self._session.query(WorkflowDraftVariable).where(*criteria) variables = ( query.options(orm.selectinload(WorkflowDraftVariable.variable_file)) @@ -282,36 +295,36 @@ class WorkflowDraftVariableService: ) return WorkflowDraftVariableList(variables=variables) - def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: - return self._list_node_variables(app_id, node_id) + def list_node_variables(self, app_id: str, node_id: str, user_id: str) -> WorkflowDraftVariableList: + return self._list_node_variables(app_id, node_id, user_id=user_id) - def list_conversation_variables(self, app_id: str) -> WorkflowDraftVariableList: - return self._list_node_variables(app_id, CONVERSATION_VARIABLE_NODE_ID) + def list_conversation_variables(self, app_id: str, user_id: str) -> WorkflowDraftVariableList: + return self._list_node_variables(app_id, CONVERSATION_VARIABLE_NODE_ID, user_id=user_id) - def list_system_variables(self, app_id: str) -> WorkflowDraftVariableList: - return self._list_node_variables(app_id, SYSTEM_VARIABLE_NODE_ID) + def list_system_variables(self, app_id: str, user_id: str) -> WorkflowDraftVariableList: + return self._list_node_variables(app_id, SYSTEM_VARIABLE_NODE_ID, user_id=user_id) - def get_conversation_variable(self, app_id: str, name: str) -> WorkflowDraftVariable | None: - return self._get_variable(app_id=app_id, node_id=CONVERSATION_VARIABLE_NODE_ID, name=name) + def get_conversation_variable(self, app_id: str, name: str, user_id: str) -> WorkflowDraftVariable | None: + return self._get_variable(app_id=app_id, node_id=CONVERSATION_VARIABLE_NODE_ID, name=name, user_id=user_id) - def get_system_variable(self, app_id: str, name: str) -> WorkflowDraftVariable | None: - return self._get_variable(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name) + def get_system_variable(self, app_id: str, name: str, user_id: str) -> WorkflowDraftVariable | None: + return self._get_variable(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, user_id=user_id) - def get_node_variable(self, app_id: str, node_id: str, name: str) -> WorkflowDraftVariable | None: - return self._get_variable(app_id, node_id, name) + def get_node_variable(self, app_id: str, node_id: str, name: str, user_id: str) -> WorkflowDraftVariable | None: + return self._get_variable(app_id, node_id, name, user_id=user_id) - def _get_variable(self, app_id: str, node_id: str, name: str) -> WorkflowDraftVariable | None: - variable = ( + def _get_variable(self, app_id: str, node_id: str, name: str, user_id: str) -> WorkflowDraftVariable | None: + return ( self._session.query(WorkflowDraftVariable) .options(orm.selectinload(WorkflowDraftVariable.variable_file)) .where( WorkflowDraftVariable.app_id == app_id, WorkflowDraftVariable.node_id == node_id, WorkflowDraftVariable.name == name, + WorkflowDraftVariable.user_id == user_id, ) .first() ) - return variable def update_variable( self, @@ -462,7 +475,17 @@ class WorkflowDraftVariableService: self._session.delete(upload_file) self._session.delete(variable) - def delete_workflow_variables(self, app_id: str): + def delete_user_workflow_variables(self, app_id: str, user_id: str): + ( + self._session.query(WorkflowDraftVariable) + .where( + WorkflowDraftVariable.app_id == app_id, + WorkflowDraftVariable.user_id == user_id, + ) + .delete(synchronize_session=False) + ) + + def delete_app_workflow_variables(self, app_id: str): ( self._session.query(WorkflowDraftVariable) .where(WorkflowDraftVariable.app_id == app_id) @@ -501,28 +524,35 @@ class WorkflowDraftVariableService: self._session.delete(upload_file) self._session.delete(variable_file) - def delete_node_variables(self, app_id: str, node_id: str): - return self._delete_node_variables(app_id, node_id) + def delete_node_variables(self, app_id: str, node_id: str, user_id: str): + return self._delete_node_variables(app_id, node_id, user_id=user_id) - def _delete_node_variables(self, app_id: str, node_id: str): - self._session.query(WorkflowDraftVariable).where( - WorkflowDraftVariable.app_id == app_id, - WorkflowDraftVariable.node_id == node_id, - ).delete() + def _delete_node_variables(self, app_id: str, node_id: str, user_id: str): + ( + self._session.query(WorkflowDraftVariable) + .where( + WorkflowDraftVariable.app_id == app_id, + WorkflowDraftVariable.node_id == node_id, + WorkflowDraftVariable.user_id == user_id, + ) + .delete(synchronize_session=False) + ) - def _get_conversation_id_from_draft_variable(self, app_id: str) -> str | None: + def _get_conversation_id_from_draft_variable(self, app_id: str, user_id: str) -> str | None: draft_var = self._get_variable( app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=str(SystemVariableKey.CONVERSATION_ID), + user_id=user_id, ) if draft_var is None: return None segment = draft_var.get_value() if not isinstance(segment, StringSegment): logger.warning( - "sys.conversation_id variable is not a string: app_id=%s, id=%s", + "sys.conversation_id variable is not a string: app_id=%s, user_id=%s, id=%s", app_id, + user_id, draft_var.id, ) return None @@ -543,7 +573,7 @@ class WorkflowDraftVariableService: If no such conversation exists, a new conversation is created and its ID is returned. """ - conv_id = self._get_conversation_id_from_draft_variable(workflow.app_id) + conv_id = self._get_conversation_id_from_draft_variable(workflow.app_id, account_id) if conv_id is not None: conversation = ( @@ -580,12 +610,13 @@ class WorkflowDraftVariableService: self._session.flush() return conversation.id - def prefill_conversation_variable_default_values(self, workflow: Workflow): + def prefill_conversation_variable_default_values(self, workflow: Workflow, user_id: str): """""" draft_conv_vars: list[WorkflowDraftVariable] = [] for conv_var in workflow.conversation_variables: draft_var = WorkflowDraftVariable.new_conversation_variable( app_id=workflow.app_id, + user_id=user_id, name=conv_var.name, value=conv_var, description=conv_var.description, @@ -635,7 +666,7 @@ def _batch_upsert_draft_variable( stmt = pg_insert(WorkflowDraftVariable).values([_model_to_insertion_dict(v) for v in draft_vars]) if policy == _UpsertPolicy.OVERWRITE: stmt = stmt.on_conflict_do_update( - index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(), + index_elements=WorkflowDraftVariable.unique_app_id_user_id_node_id_name(), set_={ # Refresh creation timestamp to ensure updated variables # appear first in chronologically sorted result sets. @@ -652,7 +683,9 @@ def _batch_upsert_draft_variable( }, ) elif policy == _UpsertPolicy.IGNORE: - stmt = stmt.on_conflict_do_nothing(index_elements=WorkflowDraftVariable.unique_app_id_node_id_name()) + stmt = stmt.on_conflict_do_nothing( + index_elements=WorkflowDraftVariable.unique_app_id_user_id_node_id_name() + ) else: stmt = mysql_insert(WorkflowDraftVariable).values([_model_to_insertion_dict(v) for v in draft_vars]) # type: ignore[assignment] if policy == _UpsertPolicy.OVERWRITE: @@ -682,6 +715,7 @@ def _model_to_insertion_dict(model: WorkflowDraftVariable) -> dict[str, Any]: d: dict[str, Any] = { "id": model.id, "app_id": model.app_id, + "user_id": model.user_id, "last_edited_at": None, "node_id": model.node_id, "name": model.name, @@ -807,6 +841,7 @@ class DraftVariableSaver: def _create_dummy_output_variable(self): return WorkflowDraftVariable.new_node_variable( app_id=self._app_id, + user_id=self._user.id, node_id=self._node_id, name=self._DUMMY_OUTPUT_IDENTITY, node_execution_id=self._node_execution_id, @@ -842,6 +877,7 @@ class DraftVariableSaver: draft_vars.append( WorkflowDraftVariable.new_conversation_variable( app_id=self._app_id, + user_id=self._user.id, name=item.name, value=segment, ) @@ -862,6 +898,7 @@ class DraftVariableSaver: draft_vars.append( WorkflowDraftVariable.new_node_variable( app_id=self._app_id, + user_id=self._user.id, node_id=self._node_id, name=name, node_execution_id=self._node_execution_id, @@ -884,6 +921,7 @@ class DraftVariableSaver: draft_vars.append( WorkflowDraftVariable.new_sys_variable( app_id=self._app_id, + user_id=self._user.id, name=name, node_execution_id=self._node_execution_id, value=value_seg, @@ -1019,6 +1057,7 @@ class DraftVariableSaver: # Create the draft variable draft_var = WorkflowDraftVariable.new_node_variable( app_id=self._app_id, + user_id=self._user.id, node_id=self._node_id, name=name, node_execution_id=self._node_execution_id, @@ -1032,6 +1071,7 @@ class DraftVariableSaver: # Create the draft variable draft_var = WorkflowDraftVariable.new_node_variable( app_id=self._app_id, + user_id=self._user.id, node_id=self._node_id, name=name, node_execution_id=self._node_execution_id, diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 319107d3fb..e13cdd5f27 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -697,7 +697,7 @@ class WorkflowService: with Session(bind=db.engine, expire_on_commit=False) as session, session.begin(): draft_var_srv = WorkflowDraftVariableService(session) - draft_var_srv.prefill_conversation_variable_default_values(draft_workflow) + draft_var_srv.prefill_conversation_variable_default_values(draft_workflow, user_id=account.id) node_config = draft_workflow.get_node_config_by_id(node_id) node_type = Workflow.get_node_type_from_node_config(node_config) @@ -740,6 +740,7 @@ class WorkflowService: engine=db.engine, app_id=app_model.id, tenant_id=app_model.tenant_id, + user_id=account.id, ) enclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config) @@ -831,6 +832,7 @@ class WorkflowService: workflow=draft_workflow, node_config=node_config, manual_inputs=inputs or {}, + user_id=account.id, ) node = self._build_human_input_node( workflow=draft_workflow, @@ -891,6 +893,7 @@ class WorkflowService: workflow=draft_workflow, node_config=node_config, manual_inputs=inputs or {}, + user_id=account.id, ) node = self._build_human_input_node( workflow=draft_workflow, @@ -967,6 +970,7 @@ class WorkflowService: workflow=draft_workflow, node_config=node_config, manual_inputs=inputs or {}, + user_id=account.id, ) node = self._build_human_input_node( workflow=draft_workflow, @@ -1102,10 +1106,11 @@ class WorkflowService: workflow: Workflow, node_config: NodeConfigDict, manual_inputs: Mapping[str, Any], + user_id: str, ) -> VariablePool: with Session(bind=db.engine, expire_on_commit=False) as session, session.begin(): draft_var_srv = WorkflowDraftVariableService(session) - draft_var_srv.prefill_conversation_variable_default_values(workflow) + draft_var_srv.prefill_conversation_variable_default_values(workflow, user_id=user_id) variable_pool = VariablePool( system_variables=SystemVariable.default(), @@ -1118,6 +1123,7 @@ class WorkflowService: engine=db.engine, app_id=app_model.id, tenant_id=app_model.tenant_id, + user_id=user_id, ) variable_mapping = HumanInputNode.extract_variable_selector_to_variable_mapping( graph_config=workflow.graph_dict, diff --git a/api/tests/integration_tests/services/test_workflow_draft_variable_service.py b/api/tests/integration_tests/services/test_workflow_draft_variable_service.py index b19b4ebdad..b6aeb54cca 100644 --- a/api/tests/integration_tests/services/test_workflow_draft_variable_service.py +++ b/api/tests/integration_tests/services/test_workflow_draft_variable_service.py @@ -30,6 +30,7 @@ from services.workflow_draft_variable_service import ( class TestWorkflowDraftVariableService(unittest.TestCase): _test_app_id: str _session: Session + _test_user_id: str _node1_id = "test_node_1" _node2_id = "test_node_2" _node_exec_id = str(uuid.uuid4()) @@ -99,13 +100,13 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test_list_variables(self): srv = self._get_test_srv() - var_list = srv.list_variables_without_values(self._test_app_id, page=1, limit=2) + var_list = srv.list_variables_without_values(self._test_app_id, page=1, limit=2, user_id=self._test_user_id) assert var_list.total == 5 assert len(var_list.variables) == 2 page1_var_ids = {v.id for v in var_list.variables} assert page1_var_ids.issubset(self._variable_ids) - var_list_2 = srv.list_variables_without_values(self._test_app_id, page=2, limit=2) + var_list_2 = srv.list_variables_without_values(self._test_app_id, page=2, limit=2, user_id=self._test_user_id) assert var_list_2.total is None assert len(var_list_2.variables) == 2 page2_var_ids = {v.id for v in var_list_2.variables} @@ -114,7 +115,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test_get_node_variable(self): srv = self._get_test_srv() - node_var = srv.get_node_variable(self._test_app_id, self._node1_id, "str_var") + node_var = srv.get_node_variable(self._test_app_id, self._node1_id, "str_var", user_id=self._test_user_id) assert node_var is not None assert node_var.id == self._node1_str_var_id assert node_var.name == "str_var" @@ -122,7 +123,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test_get_system_variable(self): srv = self._get_test_srv() - sys_var = srv.get_system_variable(self._test_app_id, "sys_var") + sys_var = srv.get_system_variable(self._test_app_id, "sys_var", user_id=self._test_user_id) assert sys_var is not None assert sys_var.id == self._sys_var_id assert sys_var.name == "sys_var" @@ -130,7 +131,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test_get_conversation_variable(self): srv = self._get_test_srv() - conv_var = srv.get_conversation_variable(self._test_app_id, "conv_var") + conv_var = srv.get_conversation_variable(self._test_app_id, "conv_var", user_id=self._test_user_id) assert conv_var is not None assert conv_var.id == self._conv_var_id assert conv_var.name == "conv_var" @@ -138,7 +139,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test_delete_node_variables(self): srv = self._get_test_srv() - srv.delete_node_variables(self._test_app_id, self._node2_id) + srv.delete_node_variables(self._test_app_id, self._node2_id, user_id=self._test_user_id) node2_var_count = ( self._session.query(WorkflowDraftVariable) .where( @@ -162,7 +163,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): def test__list_node_variables(self): srv = self._get_test_srv() - node_vars = srv._list_node_variables(self._test_app_id, self._node2_id) + node_vars = srv._list_node_variables(self._test_app_id, self._node2_id, user_id=self._test_user_id) assert len(node_vars.variables) == 2 assert {v.id for v in node_vars.variables} == set(self._node2_var_ids) @@ -173,7 +174,7 @@ class TestWorkflowDraftVariableService(unittest.TestCase): [self._node2_id, "str_var"], [self._node2_id, "int_var"], ] - variables = srv.get_draft_variables_by_selectors(self._test_app_id, selectors) + variables = srv.get_draft_variables_by_selectors(self._test_app_id, selectors, user_id=self._test_user_id) assert len(variables) == 3 assert {v.id for v in variables} == {self._node1_str_var_id} | set(self._node2_var_ids) @@ -206,19 +207,23 @@ class TestDraftVariableLoader(unittest.TestCase): def setUp(self): self._test_app_id = str(uuid.uuid4()) self._test_tenant_id = str(uuid.uuid4()) + self._test_user_id = str(uuid.uuid4()) sys_var = WorkflowDraftVariable.new_sys_variable( app_id=self._test_app_id, + user_id=self._test_user_id, name="sys_var", value=build_segment("sys_value"), node_execution_id=self._node_exec_id, ) conv_var = WorkflowDraftVariable.new_conversation_variable( app_id=self._test_app_id, + user_id=self._test_user_id, name="conv_var", value=build_segment("conv_value"), ) node_var = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id=self._node1_id, name="str_var", value=build_segment("str_value"), @@ -248,12 +253,22 @@ class TestDraftVariableLoader(unittest.TestCase): session.commit() def test_variable_loader_with_empty_selector(self): - var_loader = DraftVarLoader(engine=db.engine, app_id=self._test_app_id, tenant_id=self._test_tenant_id) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=self._test_app_id, + tenant_id=self._test_tenant_id, + user_id=self._test_user_id, + ) variables = var_loader.load_variables([]) assert len(variables) == 0 def test_variable_loader_with_non_empty_selector(self): - var_loader = DraftVarLoader(engine=db.engine, app_id=self._test_app_id, tenant_id=self._test_tenant_id) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=self._test_app_id, + tenant_id=self._test_tenant_id, + user_id=self._test_user_id, + ) variables = var_loader.load_variables( [ [SYSTEM_VARIABLE_NODE_ID, "sys_var"], @@ -296,7 +311,12 @@ class TestDraftVariableLoader(unittest.TestCase): session.commit() # Now test loading using DraftVarLoader - var_loader = DraftVarLoader(engine=db.engine, app_id=self._test_app_id, tenant_id=self._test_tenant_id) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=self._test_app_id, + tenant_id=self._test_tenant_id, + user_id=setup_account.id, + ) # Load the variable using the standard workflow variables = var_loader.load_variables([["test_offload_node", "offloaded_string_var"]]) @@ -313,7 +333,7 @@ class TestDraftVariableLoader(unittest.TestCase): # Clean up - delete all draft variables for this app with Session(bind=db.engine) as session: service = WorkflowDraftVariableService(session) - service.delete_workflow_variables(self._test_app_id) + service.delete_app_workflow_variables(self._test_app_id) session.commit() def test_load_offloaded_variable_object_type_integration(self): @@ -364,6 +384,7 @@ class TestDraftVariableLoader(unittest.TestCase): # Now create the offloaded draft variable with the correct file_id offloaded_var = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id="test_offload_node", name="offloaded_object_var", value=build_segment({"truncated": True}), @@ -379,7 +400,9 @@ class TestDraftVariableLoader(unittest.TestCase): # Use the service method that properly preloads relationships service = WorkflowDraftVariableService(session) draft_vars = service.get_draft_variables_by_selectors( - self._test_app_id, [["test_offload_node", "offloaded_object_var"]] + self._test_app_id, + [["test_offload_node", "offloaded_object_var"]], + user_id=self._test_user_id, ) assert len(draft_vars) == 1 @@ -387,7 +410,12 @@ class TestDraftVariableLoader(unittest.TestCase): assert loaded_var.is_truncated() # Create DraftVarLoader and test loading - var_loader = DraftVarLoader(engine=db.engine, app_id=self._test_app_id, tenant_id=self._test_tenant_id) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=self._test_app_id, + tenant_id=self._test_tenant_id, + user_id=self._test_user_id, + ) # Test the _load_offloaded_variable method selector_tuple, variable = var_loader._load_offloaded_variable(loaded_var) @@ -459,6 +487,7 @@ class TestDraftVariableLoader(unittest.TestCase): # Now create the offloaded draft variable with the correct file_id offloaded_var = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id="test_integration_node", name="offloaded_integration_var", value=build_segment("truncated"), @@ -473,7 +502,12 @@ class TestDraftVariableLoader(unittest.TestCase): # Test load_variables with both regular and offloaded variables # This method should handle the relationship preloading internally - var_loader = DraftVarLoader(engine=db.engine, app_id=self._test_app_id, tenant_id=self._test_tenant_id) + var_loader = DraftVarLoader( + engine=db.engine, + app_id=self._test_app_id, + tenant_id=self._test_tenant_id, + user_id=self._test_user_id, + ) variables = var_loader.load_variables( [ @@ -572,6 +606,7 @@ class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase): # Create test variables self._node_var_with_exec = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id=self._node_id, name="test_var", value=build_segment("old_value"), @@ -581,6 +616,7 @@ class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase): self._node_var_without_exec = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id=self._node_id, name="no_exec_var", value=build_segment("some_value"), @@ -591,6 +627,7 @@ class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase): self._node_var_missing_exec = WorkflowDraftVariable.new_node_variable( app_id=self._test_app_id, + user_id=self._test_user_id, node_id=self._node_id, name="missing_exec_var", value=build_segment("some_value"), @@ -599,6 +636,7 @@ class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase): self._conv_var = WorkflowDraftVariable.new_conversation_variable( app_id=self._test_app_id, + user_id=self._test_user_id, name="conv_var_1", value=build_segment("old_conv_value"), ) @@ -764,6 +802,7 @@ class TestWorkflowDraftVariableServiceResetVariable(unittest.TestCase): # Create a system variable sys_var = WorkflowDraftVariable.new_sys_variable( app_id=self._test_app_id, + user_id=self._test_user_id, name="sys_var", value=build_segment("sys_value"), node_execution_id=self._node_exec_id, diff --git a/api/tests/test_containers_integration_tests/services/test_workflow_draft_variable_service.py b/api/tests/test_containers_integration_tests/services/test_workflow_draft_variable_service.py index ab409deb89..572cf72fa0 100644 --- a/api/tests/test_containers_integration_tests/services/test_workflow_draft_variable_service.py +++ b/api/tests/test_containers_integration_tests/services/test_workflow_draft_variable_service.py @@ -122,6 +122,7 @@ class TestWorkflowDraftVariableService: name, value, variable_type: DraftVariableType = DraftVariableType.CONVERSATION, + user_id: str | None = None, fake=None, ): """ @@ -144,10 +145,15 @@ class TestWorkflowDraftVariableService: WorkflowDraftVariable: Created test variable instance with proper type configuration """ fake = fake or Faker() + if user_id is None: + app = db_session_with_containers.query(App).filter_by(id=app_id).first() + assert app is not None + user_id = app.created_by if variable_type == "conversation": # Create conversation variable using the appropriate factory method variable = WorkflowDraftVariable.new_conversation_variable( app_id=app_id, + user_id=user_id, name=name, value=value, description=fake.text(max_nb_chars=20), @@ -156,6 +162,7 @@ class TestWorkflowDraftVariableService: # Create system variable with editable flag and execution context variable = WorkflowDraftVariable.new_sys_variable( app_id=app_id, + user_id=user_id, name=name, value=value, node_execution_id=fake.uuid4(), @@ -165,6 +172,7 @@ class TestWorkflowDraftVariableService: # Create node variable with visibility and editability settings variable = WorkflowDraftVariable.new_node_variable( app_id=app_id, + user_id=user_id, node_id=node_id, name=name, value=value, @@ -189,7 +197,13 @@ class TestWorkflowDraftVariableService: app = self._create_test_app(db_session_with_containers, mock_external_service_dependencies, fake=fake) test_value = StringSegment(value=fake.word()) variable = self._create_test_variable( - db_session_with_containers, app.id, CONVERSATION_VARIABLE_NODE_ID, "test_var", test_value, fake=fake + db_session_with_containers, + app.id, + CONVERSATION_VARIABLE_NODE_ID, + "test_var", + test_value, + user_id=app.created_by, + fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) retrieved_variable = service.get_variable(variable.id) @@ -250,7 +264,7 @@ class TestWorkflowDraftVariableService: ["test_node_1", "var3"], ] service = WorkflowDraftVariableService(db_session_with_containers) - retrieved_variables = service.get_draft_variables_by_selectors(app.id, selectors) + retrieved_variables = service.get_draft_variables_by_selectors(app.id, selectors, user_id=app.created_by) assert len(retrieved_variables) == 3 var_names = [var.name for var in retrieved_variables] assert "var1" in var_names @@ -288,7 +302,7 @@ class TestWorkflowDraftVariableService: fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) - result = service.list_variables_without_values(app.id, page=1, limit=3) + result = service.list_variables_without_values(app.id, page=1, limit=3, user_id=app.created_by) assert result.total == 5 assert len(result.variables) == 3 assert result.variables[0].created_at >= result.variables[1].created_at @@ -339,7 +353,7 @@ class TestWorkflowDraftVariableService: fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) - result = service.list_node_variables(app.id, node_id) + result = service.list_node_variables(app.id, node_id, user_id=app.created_by) assert len(result.variables) == 2 for var in result.variables: assert var.node_id == node_id @@ -381,7 +395,7 @@ class TestWorkflowDraftVariableService: fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) - result = service.list_conversation_variables(app.id) + result = service.list_conversation_variables(app.id, user_id=app.created_by) assert len(result.variables) == 2 for var in result.variables: assert var.node_id == CONVERSATION_VARIABLE_NODE_ID @@ -559,7 +573,7 @@ class TestWorkflowDraftVariableService: assert len(app_variables) == 3 assert len(other_app_variables) == 1 service = WorkflowDraftVariableService(db_session_with_containers) - service.delete_workflow_variables(app.id) + service.delete_user_workflow_variables(app.id, user_id=app.created_by) app_variables_after = db_session_with_containers.query(WorkflowDraftVariable).filter_by(app_id=app.id).all() other_app_variables_after = ( db_session_with_containers.query(WorkflowDraftVariable).filter_by(app_id=other_app.id).all() @@ -567,6 +581,69 @@ class TestWorkflowDraftVariableService: assert len(app_variables_after) == 0 assert len(other_app_variables_after) == 1 + def test_draft_variables_are_isolated_between_users( + self, db_session_with_containers: Session, mock_external_service_dependencies + ): + """ + Test draft variable isolation for different users in the same app. + + This test verifies that: + 1. Query APIs return only variables owned by the target user. + 2. User-scoped deletion only removes variables for that user and keeps + other users' variables in the same app untouched. + """ + fake = Faker() + app = self._create_test_app(db_session_with_containers, mock_external_service_dependencies, fake=fake) + user_a = app.created_by + user_b = fake.uuid4() + + # Use identical variable names on purpose to verify uniqueness scope includes user_id. + self._create_test_variable( + db_session_with_containers, + app.id, + CONVERSATION_VARIABLE_NODE_ID, + "shared_name", + StringSegment(value="value_a"), + user_id=user_a, + fake=fake, + ) + self._create_test_variable( + db_session_with_containers, + app.id, + CONVERSATION_VARIABLE_NODE_ID, + "shared_name", + StringSegment(value="value_b"), + user_id=user_b, + fake=fake, + ) + self._create_test_variable( + db_session_with_containers, + app.id, + CONVERSATION_VARIABLE_NODE_ID, + "only_a", + StringSegment(value="only_a"), + user_id=user_a, + fake=fake, + ) + + service = WorkflowDraftVariableService(db_session_with_containers) + + user_a_vars = service.list_conversation_variables(app.id, user_id=user_a) + user_b_vars = service.list_conversation_variables(app.id, user_id=user_b) + assert {v.name for v in user_a_vars.variables} == {"shared_name", "only_a"} + assert {v.name for v in user_b_vars.variables} == {"shared_name"} + + service.delete_user_workflow_variables(app.id, user_id=user_a) + + user_a_remaining = ( + db_session_with_containers.query(WorkflowDraftVariable).filter_by(app_id=app.id, user_id=user_a).count() + ) + user_b_remaining = ( + db_session_with_containers.query(WorkflowDraftVariable).filter_by(app_id=app.id, user_id=user_b).count() + ) + assert user_a_remaining == 0 + assert user_b_remaining == 1 + def test_delete_node_variables_success( self, db_session_with_containers: Session, mock_external_service_dependencies ): @@ -627,7 +704,7 @@ class TestWorkflowDraftVariableService: assert len(other_node_variables) == 1 assert len(conv_variables) == 1 service = WorkflowDraftVariableService(db_session_with_containers) - service.delete_node_variables(app.id, node_id) + service.delete_node_variables(app.id, node_id, user_id=app.created_by) target_node_variables_after = ( db_session_with_containers.query(WorkflowDraftVariable).filter_by(app_id=app.id, node_id=node_id).all() ) @@ -675,7 +752,7 @@ class TestWorkflowDraftVariableService: db_session_with_containers.commit() service = WorkflowDraftVariableService(db_session_with_containers) - service.prefill_conversation_variable_default_values(workflow) + service.prefill_conversation_variable_default_values(workflow, user_id="00000000-0000-0000-0000-000000000001") draft_variables = ( db_session_with_containers.query(WorkflowDraftVariable) .filter_by(app_id=app.id, node_id=CONVERSATION_VARIABLE_NODE_ID) @@ -715,7 +792,7 @@ class TestWorkflowDraftVariableService: fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) - retrieved_conv_id = service._get_conversation_id_from_draft_variable(app.id) + retrieved_conv_id = service._get_conversation_id_from_draft_variable(app.id, app.created_by) assert retrieved_conv_id == conversation_id def test_get_conversation_id_from_draft_variable_not_found( @@ -731,7 +808,7 @@ class TestWorkflowDraftVariableService: fake = Faker() app = self._create_test_app(db_session_with_containers, mock_external_service_dependencies, fake=fake) service = WorkflowDraftVariableService(db_session_with_containers) - retrieved_conv_id = service._get_conversation_id_from_draft_variable(app.id) + retrieved_conv_id = service._get_conversation_id_from_draft_variable(app.id, app.created_by) assert retrieved_conv_id is None def test_list_system_variables_success( @@ -772,7 +849,7 @@ class TestWorkflowDraftVariableService: db_session_with_containers, app.id, CONVERSATION_VARIABLE_NODE_ID, "conv_var", conv_var_value, fake=fake ) service = WorkflowDraftVariableService(db_session_with_containers) - result = service.list_system_variables(app.id) + result = service.list_system_variables(app.id, user_id=app.created_by) assert len(result.variables) == 2 for var in result.variables: assert var.node_id == SYSTEM_VARIABLE_NODE_ID @@ -819,15 +896,15 @@ class TestWorkflowDraftVariableService: fake=fake, ) service = WorkflowDraftVariableService(db_session_with_containers) - retrieved_conv_var = service.get_conversation_variable(app.id, "test_conv_var") + retrieved_conv_var = service.get_conversation_variable(app.id, "test_conv_var", user_id=app.created_by) assert retrieved_conv_var is not None assert retrieved_conv_var.name == "test_conv_var" assert retrieved_conv_var.node_id == CONVERSATION_VARIABLE_NODE_ID - retrieved_sys_var = service.get_system_variable(app.id, "test_sys_var") + retrieved_sys_var = service.get_system_variable(app.id, "test_sys_var", user_id=app.created_by) assert retrieved_sys_var is not None assert retrieved_sys_var.name == "test_sys_var" assert retrieved_sys_var.node_id == SYSTEM_VARIABLE_NODE_ID - retrieved_node_var = service.get_node_variable(app.id, "test_node", "test_node_var") + retrieved_node_var = service.get_node_variable(app.id, "test_node", "test_node_var", user_id=app.created_by) assert retrieved_node_var is not None assert retrieved_node_var.name == "test_node_var" assert retrieved_node_var.node_id == "test_node" @@ -845,9 +922,14 @@ class TestWorkflowDraftVariableService: fake = Faker() app = self._create_test_app(db_session_with_containers, mock_external_service_dependencies, fake=fake) service = WorkflowDraftVariableService(db_session_with_containers) - retrieved_conv_var = service.get_conversation_variable(app.id, "non_existent_conv_var") + retrieved_conv_var = service.get_conversation_variable(app.id, "non_existent_conv_var", user_id=app.created_by) assert retrieved_conv_var is None - retrieved_sys_var = service.get_system_variable(app.id, "non_existent_sys_var") + retrieved_sys_var = service.get_system_variable(app.id, "non_existent_sys_var", user_id=app.created_by) assert retrieved_sys_var is None - retrieved_node_var = service.get_node_variable(app.id, "test_node", "non_existent_node_var") + retrieved_node_var = service.get_node_variable( + app.id, + "test_node", + "non_existent_node_var", + user_id=app.created_by, + ) assert retrieved_node_var is None diff --git a/api/tests/unit_tests/controllers/console/app/test_app_apis.py b/api/tests/unit_tests/controllers/console/app/test_app_apis.py index 074bbfab78..60b8ee96fe 100644 --- a/api/tests/unit_tests/controllers/console/app/test_app_apis.py +++ b/api/tests/unit_tests/controllers/console/app/test_app_apis.py @@ -398,6 +398,7 @@ class TestWorkflowDraftVariableEndpoints: method = _unwrap(api.get) monkeypatch.setattr(workflow_draft_variable_module, "db", SimpleNamespace(engine=MagicMock())) + monkeypatch.setattr(workflow_draft_variable_module, "current_user", SimpleNamespace(id="user-1")) class DummySession: def __enter__(self): diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py index e2618d960c..441d2fcd17 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_app_generator.py @@ -234,6 +234,7 @@ class TestAdvancedChatAppGeneratorInternals: captured: dict[str, object] = {} prefill_calls: list[object] = [] var_loader = SimpleNamespace(loader="draft") + workflow = SimpleNamespace(id="workflow-id") monkeypatch.setattr( "core.app.apps.advanced_chat.app_generator.AdvancedChatAppConfigManager.get_app_config", @@ -260,8 +261,8 @@ class TestAdvancedChatAppGeneratorInternals: def __init__(self, session): _ = session - def prefill_conversation_variable_default_values(self, workflow): - prefill_calls.append(workflow) + def prefill_conversation_variable_default_values(self, workflow, user_id): + prefill_calls.append((workflow, user_id)) monkeypatch.setattr("core.app.apps.advanced_chat.app_generator.WorkflowDraftVariableService", _DraftVarService) @@ -273,7 +274,7 @@ class TestAdvancedChatAppGeneratorInternals: result = generator.single_iteration_generate( app_model=SimpleNamespace(id="app", tenant_id="tenant"), - workflow=SimpleNamespace(id="workflow-id"), + workflow=workflow, node_id="node-1", user=SimpleNamespace(id="user-id"), args={"inputs": {"foo": "bar"}}, @@ -281,7 +282,7 @@ class TestAdvancedChatAppGeneratorInternals: ) assert result == {"ok": True} - assert prefill_calls + assert prefill_calls == [(workflow, "user-id")] assert captured["variable_loader"] is var_loader assert captured["application_generate_entity"].single_iteration_run.node_id == "node-1" @@ -291,6 +292,7 @@ class TestAdvancedChatAppGeneratorInternals: captured: dict[str, object] = {} prefill_calls: list[object] = [] var_loader = SimpleNamespace(loader="draft") + workflow = SimpleNamespace(id="workflow-id") monkeypatch.setattr( "core.app.apps.advanced_chat.app_generator.AdvancedChatAppConfigManager.get_app_config", @@ -317,8 +319,8 @@ class TestAdvancedChatAppGeneratorInternals: def __init__(self, session): _ = session - def prefill_conversation_variable_default_values(self, workflow): - prefill_calls.append(workflow) + def prefill_conversation_variable_default_values(self, workflow, user_id): + prefill_calls.append((workflow, user_id)) monkeypatch.setattr("core.app.apps.advanced_chat.app_generator.WorkflowDraftVariableService", _DraftVarService) @@ -330,7 +332,7 @@ class TestAdvancedChatAppGeneratorInternals: result = generator.single_loop_generate( app_model=SimpleNamespace(id="app", tenant_id="tenant"), - workflow=SimpleNamespace(id="workflow-id"), + workflow=workflow, node_id="node-2", user=SimpleNamespace(id="user-id"), args=SimpleNamespace(inputs={"foo": "bar"}), @@ -338,7 +340,7 @@ class TestAdvancedChatAppGeneratorInternals: ) assert result == {"ok": True} - assert prefill_calls + assert prefill_calls == [(workflow, "user-id")] assert captured["variable_loader"] is var_loader assert captured["application_generate_entity"].single_loop_run.node_id == "node-2" diff --git a/api/tests/unit_tests/services/test_app_dsl_service.py b/api/tests/unit_tests/services/test_app_dsl_service.py index b9ab03455d..4f7d184046 100644 --- a/api/tests/unit_tests/services/test_app_dsl_service.py +++ b/api/tests/unit_tests/services/test_app_dsl_service.py @@ -263,7 +263,7 @@ def test_import_app_completed_uses_declared_dependencies(monkeypatch): assert result.status == ImportStatus.COMPLETED assert result.app_id == "app-new" - draft_var_service.delete_workflow_variables.assert_called_once_with(app_id="app-new") + draft_var_service.delete_app_workflow_variables.assert_called_once_with(app_id="app-new") @pytest.mark.parametrize("has_workflow", [True, False]) @@ -305,7 +305,7 @@ def test_import_app_legacy_versions_extract_dependencies(monkeypatch, has_workfl account=_account_mock(), import_mode=ImportMode.YAML_CONTENT, yaml_content=_yaml_dump(data) ) assert result.status == ImportStatus.COMPLETED_WITH_WARNINGS - draft_var_service.delete_workflow_variables.assert_called_once_with(app_id="app-legacy") + draft_var_service.delete_app_workflow_variables.assert_called_once_with(app_id="app-legacy") def test_import_app_yaml_error_returns_failed(monkeypatch): diff --git a/api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py b/api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py index 1e0fdd788b..f3391d6380 100644 --- a/api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py +++ b/api/tests/unit_tests/services/workflow/test_draft_var_loader_simple.py @@ -24,7 +24,11 @@ class TestDraftVarLoaderSimple: def draft_var_loader(self, mock_engine): """Create DraftVarLoader instance for testing.""" return DraftVarLoader( - engine=mock_engine, app_id="test-app-id", tenant_id="test-tenant-id", fallback_variables=[] + engine=mock_engine, + app_id="test-app-id", + tenant_id="test-tenant-id", + user_id="test-user-id", + fallback_variables=[], ) def test_load_offloaded_variable_string_type_unit(self, draft_var_loader): @@ -323,7 +327,9 @@ class TestDraftVarLoaderSimple: # Verify service method was called mock_service.get_draft_variables_by_selectors.assert_called_once_with( - draft_var_loader._app_id, selectors + draft_var_loader._app_id, + selectors, + user_id=draft_var_loader._user_id, ) # Verify offloaded variable loading was called diff --git a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py index 9f3874b8f1..0c2be9c79f 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py @@ -8,7 +8,7 @@ from sqlalchemy import Engine from sqlalchemy.orm import Session from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID -from dify_graph.enums import BuiltinNodeTypes +from dify_graph.enums import BuiltinNodeTypes, SystemVariableKey from dify_graph.variables.segments import StringSegment from dify_graph.variables.types import SegmentType from libs.uuid_utils import uuidv7 @@ -182,6 +182,42 @@ class TestDraftVariableSaver: draft_vars = mock_batch_upsert.call_args[0][1] assert len(draft_vars) == 2 + @patch("services.workflow_draft_variable_service._batch_upsert_draft_variable", autospec=True) + def test_start_node_save_persists_sys_timestamp_and_workflow_run_id(self, mock_batch_upsert): + """Start node should persist common `sys.*` variables, not only `sys.files`.""" + mock_session = MagicMock(spec=Session) + mock_user = MagicMock(spec=Account) + mock_user.id = "test-user-id" + mock_user.tenant_id = "test-tenant-id" + + saver = DraftVariableSaver( + session=mock_session, + app_id="test-app-id", + node_id="start-node-id", + node_type=BuiltinNodeTypes.START, + node_execution_id="exec-id", + user=mock_user, + ) + + outputs = { + f"{SYSTEM_VARIABLE_NODE_ID}.{SystemVariableKey.TIMESTAMP}": 1700000000, + f"{SYSTEM_VARIABLE_NODE_ID}.{SystemVariableKey.WORKFLOW_EXECUTION_ID}": "run-id-123", + } + + saver.save(outputs=outputs) + + mock_batch_upsert.assert_called_once() + draft_vars = mock_batch_upsert.call_args[0][1] + + # plus one dummy output because there are no non-sys Start inputs + assert len(draft_vars) == 3 + + sys_vars = [v for v in draft_vars if v.node_id == SYSTEM_VARIABLE_NODE_ID] + assert {v.name for v in sys_vars} == { + str(SystemVariableKey.TIMESTAMP), + str(SystemVariableKey.WORKFLOW_EXECUTION_ID), + } + class TestWorkflowDraftVariableService: def _get_test_app_id(self): diff --git a/api/tests/unit_tests/services/workflow/test_workflow_service.py b/api/tests/unit_tests/services/workflow/test_workflow_service.py index ed26bcec01..538c1b3595 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_service.py @@ -245,6 +245,7 @@ class TestWorkflowService: workflow=workflow, node_config=node_config, manual_inputs={"#node-0.result#": "LLM output"}, + user_id="account-1", ) node.render_form_content_with_outputs.assert_called_once()