From 58dfe2ca038e71affbd1e6fe2de7975a4abbc04a Mon Sep 17 00:00:00 2001 From: Joel Date: Wed, 2 Jul 2025 10:55:38 +0800 Subject: [PATCH 1/6] fix: when config plugin endpoint choose no start form app cause page crashed (#21789) --- .../app-selector/app-inputs-panel.tsx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/web/app/components/plugins/plugin-detail-panel/app-selector/app-inputs-panel.tsx b/web/app/components/plugins/plugin-detail-panel/app-selector/app-inputs-panel.tsx index b22f59fe2c..d3ac9d7d2e 100644 --- a/web/app/components/plugins/plugin-detail-panel/app-selector/app-inputs-panel.tsx +++ b/web/app/components/plugins/plugin-detail-panel/app-selector/app-inputs-panel.tsx @@ -62,7 +62,7 @@ const AppInputsPanel = ({ return [] let inputFormSchema = [] if (isBasicApp) { - inputFormSchema = currentApp.model_config.user_input_form.filter((item: any) => !item.external_data_tool).map((item: any) => { + inputFormSchema = currentApp.model_config?.user_input_form?.filter((item: any) => !item.external_data_tool).map((item: any) => { if (item.paragraph) { return { ...item.paragraph, @@ -108,10 +108,10 @@ const AppInputsPanel = ({ type: 'text-input', required: false, } - }) + }) || [] } else { - const startNode = currentWorkflow?.graph.nodes.find(node => node.data.type === BlockEnum.Start) as any + const startNode = currentWorkflow?.graph?.nodes.find(node => node.data.type === BlockEnum.Start) as any inputFormSchema = startNode?.data.variables.map((variable: any) => { if (variable.type === InputVarType.multiFiles) { return { @@ -132,7 +132,7 @@ const AppInputsPanel = ({ ...variable, required: false, } - }) + }) || [] } if ((currentApp.mode === 'completion' || currentApp.mode === 'workflow') && basicAppFileConfig.enabled) { inputFormSchema.push({ @@ -144,7 +144,7 @@ const AppInputsPanel = ({ fileUploadConfig, }) } - return inputFormSchema + return inputFormSchema || [] }, [basicAppFileConfig, currentApp, currentWorkflow, fileUploadConfig, isBasicApp]) const handleFormChange = (value: Record) => { From f53b177e1ff13ec0779c1ebf36fc7a42d762f72f Mon Sep 17 00:00:00 2001 From: Joel Date: Wed, 2 Jul 2025 11:07:43 +0800 Subject: [PATCH 2/6] chore: new inspected variable add to top position instead of bottom (#21793) --- web/app/components/workflow/hooks/use-inspect-vars-crud.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/app/components/workflow/hooks/use-inspect-vars-crud.ts b/web/app/components/workflow/hooks/use-inspect-vars-crud.ts index e81d3b13a4..e4daaf8220 100644 --- a/web/app/components/workflow/hooks/use-inspect-vars-crud.ts +++ b/web/app/components/workflow/hooks/use-inspect-vars-crud.ts @@ -117,7 +117,7 @@ const useInspectVarsCrud = () => { if (nodeInfo) { const index = draft.findIndex(node => node.nodeId === nodeId) if (index === -1) { - draft.push({ + draft.unshift({ nodeId, nodeType: nodeInfo.data.type, title: nodeInfo.data.title, From 86179beaa554e62e1ca9550c0f65cb432e641ec4 Mon Sep 17 00:00:00 2001 From: ShadowJobs <794878115@qq.com> Date: Wed, 2 Jul 2025 11:32:23 +0800 Subject: [PATCH 3/6] =?UTF-8?q?FIX:=20dollar-sign=20escaping=20in=20prepro?= =?UTF-8?q?cessLaTeX=20code=E2=80=90block=20handling=20(#21796)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: LinYing --- web/app/components/base/markdown/markdown-utils.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/web/app/components/base/markdown/markdown-utils.ts b/web/app/components/base/markdown/markdown-utils.ts index dc3c7a9784..209fcd0b32 100644 --- a/web/app/components/base/markdown/markdown-utils.ts +++ b/web/app/components/base/markdown/markdown-utils.ts @@ -11,6 +11,7 @@ export const preprocessLaTeX = (content: string) => { const codeBlockRegex = /```[\s\S]*?```/g const codeBlocks = content.match(codeBlockRegex) || [] + const escapeReplacement = (str: string) => str.replace(/\$/g, '_TMP_REPLACE_DOLLAR_') let processedContent = content.replace(codeBlockRegex, 'CODE_BLOCK_PLACEHOLDER') processedContent = flow([ @@ -21,9 +22,11 @@ export const preprocessLaTeX = (content: string) => { ])(processedContent) codeBlocks.forEach((block) => { - processedContent = processedContent.replace('CODE_BLOCK_PLACEHOLDER', block) + processedContent = processedContent.replace('CODE_BLOCK_PLACEHOLDER', escapeReplacement(block)) }) + processedContent = processedContent.replace(/_TMP_REPLACE_DOLLAR_/g, '$') + return processedContent } From 71d6cf1b1d3040a6733b04a875a47f0d77a436eb Mon Sep 17 00:00:00 2001 From: jiangbo721 <365065261@qq.com> Date: Wed, 2 Jul 2025 12:04:33 +0800 Subject: [PATCH 4/6] fix: Make the latency and logs of web applications consistent. (#21578) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 刘江波 --- .../task_pipeline/easy_ui_based_generate_task_pipeline.py | 1 + api/core/workflow/workflow_cycle_manager.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index d535e1f835..3c8c7bb5a2 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -395,6 +395,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): message.provider_response_latency = time.perf_counter() - self._start_at message.total_price = usage.total_price message.currency = usage.currency + self._task_state.llm_result.usage.latency = message.provider_response_latency message.message_metadata = self._task_state.metadata.model_dump_json() if trace_manager: diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 6ee562fc8d..0aab2426af 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -27,6 +27,7 @@ from core.workflow.enums import SystemVariableKey from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_entry import WorkflowEntry +from libs.datetime_utils import naive_utc_now @dataclass @@ -160,12 +161,13 @@ class WorkflowCycleManager: exceptions_count: int = 0, ) -> WorkflowExecution: workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id) + now = naive_utc_now() workflow_execution.status = WorkflowExecutionStatus(status.value) workflow_execution.error_message = error_message workflow_execution.total_tokens = total_tokens workflow_execution.total_steps = total_steps - workflow_execution.finished_at = datetime.now(UTC).replace(tzinfo=None) + workflow_execution.finished_at = now workflow_execution.exceptions_count = exceptions_count # Use the instance repository to find running executions for a workflow run @@ -174,7 +176,6 @@ class WorkflowCycleManager: ) # Update the domain models - now = datetime.now(UTC).replace(tzinfo=None) for node_execution in running_node_executions: if node_execution.node_execution_id: # Update the domain model From c2e599cd85ef10e941badc5621c81505bd587de6 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 2 Jul 2025 13:36:35 +0800 Subject: [PATCH 5/6] fix(api): Fix resetting sys var causing internal server error (#21604) and sorts draft variables by their creation time, ensures a consist order. --- .../app/apps/advanced_chat/app_generator.py | 11 +- .../advanced_chat/generate_task_pipeline.py | 18 ++ api/core/app/apps/base_app_generator.py | 47 ++- api/core/app/apps/workflow/app_generator.py | 8 + .../apps/workflow/generate_task_pipeline.py | 20 ++ api/core/app/apps/workflow_app_runner.py | 35 --- .../entities/workflow_node_execution.py | 16 +- .../workflow/nodes/http_request/executor.py | 2 +- .../repositories/draft_variable_repository.py | 32 ++ .../workflow_draft_variable_service.py | 63 ++-- api/services/workflow_service.py | 2 - .../test_workflow_draft_variable_service.py | 287 +++++++++++++----- 12 files changed, 401 insertions(+), 140 deletions(-) create mode 100644 api/core/workflow/repositories/draft_variable_repository.py diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 61de9ec670..7877408cef 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -27,6 +27,9 @@ from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repositories.draft_variable_repository import ( + DraftVariableSaverFactory, +) from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader @@ -36,7 +39,10 @@ from libs.flask_utils import preserve_flask_contexts from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom from models.enums import WorkflowRunTriggeredFrom from services.conversation_service import ConversationService -from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService +from services.workflow_draft_variable_service import ( + DraftVarLoader, + WorkflowDraftVariableService, +) logger = logging.getLogger(__name__) @@ -450,6 +456,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, + draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from), ) return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from) @@ -521,6 +528,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): user: Union[Account, EndUser], workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, stream: bool = False, ) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: """ @@ -547,6 +555,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, stream=stream, + draft_var_saver_factory=draft_var_saver_factory, ) try: diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 8c5645bbb7..4c52fc3e83 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -64,6 +64,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, W from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager @@ -94,6 +95,7 @@ class AdvancedChatAppGenerateTaskPipeline: dialogue_count: int, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, @@ -153,6 +155,7 @@ class AdvancedChatAppGenerateTaskPipeline: self._conversation_name_generate_thread: Thread | None = None self._recorded_files: list[Mapping[str, Any]] = [] self._workflow_run_id: str = "" + self._draft_var_saver_factory = draft_var_saver_factory def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]: """ @@ -371,6 +374,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution=workflow_node_execution, ) session.commit() + self._save_output_for_event(event, workflow_node_execution.id) if node_finish_resp: yield node_finish_resp @@ -390,6 +394,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) if node_finish_resp: yield node_finish_resp @@ -759,3 +765,15 @@ class AdvancedChatAppGenerateTaskPipeline: if not message: raise ValueError(f"Message not found: {self._message_id}") return message + + def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str): + with Session(db.engine) as session, session.begin(): + saver = self._draft_var_saver_factory( + session=session, + app_id=self._application_generate_entity.app_config.app_id, + node_id=event.node_id, + node_type=event.node_type, + node_execution_id=node_execution_id, + enclosing_node_id=event.in_loop_id or event.in_iteration_id, + ) + saver.save(event.process_data, event.outputs) diff --git a/api/core/app/apps/base_app_generator.py b/api/core/app/apps/base_app_generator.py index a83b75cc1a..beece1d77e 100644 --- a/api/core/app/apps/base_app_generator.py +++ b/api/core/app/apps/base_app_generator.py @@ -1,10 +1,20 @@ import json from collections.abc import Generator, Mapping, Sequence -from typing import TYPE_CHECKING, Any, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union, final + +from sqlalchemy.orm import Session from core.app.app_config.entities import VariableEntityType +from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File, FileUploadConfig +from core.workflow.nodes.enums import NodeType +from core.workflow.repositories.draft_variable_repository import ( + DraftVariableSaver, + DraftVariableSaverFactory, + NoopDraftVariableSaver, +) from factories import file_factory +from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl if TYPE_CHECKING: from core.app.app_config.entities import VariableEntity @@ -159,3 +169,38 @@ class BaseAppGenerator: yield f"event: {message}\n\n" return gen() + + @final + @staticmethod + def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory: + if invoke_from == InvokeFrom.DEBUGGER: + + def draft_var_saver_factory( + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> DraftVariableSaver: + return DraftVariableSaverImpl( + session=session, + app_id=app_id, + node_id=node_id, + node_type=node_type, + node_execution_id=node_execution_id, + enclosing_node_id=enclosing_node_id, + ) + else: + + def draft_var_saver_factory( + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> DraftVariableSaver: + return NoopDraftVariableSaver() + + return draft_var_saver_factory diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index a18139e644..40a1e272a7 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -25,6 +25,7 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader @@ -236,6 +237,10 @@ class WorkflowAppGenerator(BaseAppGenerator): worker_thread.start() + draft_var_saver_factory = self._get_draft_var_saver_factory( + invoke_from, + ) + # return response or stream generator response = self._handle_response( application_generate_entity=application_generate_entity, @@ -244,6 +249,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + draft_var_saver_factory=draft_var_saver_factory, stream=streaming, ) @@ -474,6 +480,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user: Union[Account, EndUser], workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, stream: bool = False, ) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: """ @@ -494,6 +501,7 @@ class WorkflowAppGenerator(BaseAppGenerator): user=user, workflow_execution_repository=workflow_execution_repository, workflow_node_execution_repository=workflow_node_execution_repository, + draft_var_saver_factory=draft_var_saver_factory, stream=stream, ) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 1734dbb598..2a85cd5e3d 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -56,6 +56,7 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey +from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager @@ -87,6 +88,7 @@ class WorkflowAppGenerateTaskPipeline: stream: bool, workflow_execution_repository: WorkflowExecutionRepository, workflow_node_execution_repository: WorkflowNodeExecutionRepository, + draft_var_saver_factory: DraftVariableSaverFactory, ) -> None: self._base_task_pipeline = BasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, @@ -131,6 +133,8 @@ class WorkflowAppGenerateTaskPipeline: self._application_generate_entity = application_generate_entity self._workflow_features_dict = workflow.features_dict self._workflow_run_id = "" + self._invoke_from = queue_manager._invoke_from + self._draft_var_saver_factory = draft_var_saver_factory def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: """ @@ -322,6 +326,8 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution=workflow_node_execution, ) + self._save_output_for_event(event, workflow_node_execution.id) + if node_success_response: yield node_success_response elif isinstance( @@ -339,6 +345,8 @@ class WorkflowAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) + if isinstance(event, QueueNodeExceptionEvent): + self._save_output_for_event(event, workflow_node_execution.id) if node_failed_response: yield node_failed_response @@ -593,3 +601,15 @@ class WorkflowAppGenerateTaskPipeline: ) return response + + def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str): + with Session(db.engine) as session, session.begin(): + saver = self._draft_var_saver_factory( + session=session, + app_id=self._application_generate_entity.app_config.app_id, + node_id=event.node_id, + node_type=event.node_type, + node_execution_id=node_execution_id, + enclosing_node_id=event.in_loop_id or event.in_iteration_id, + ) + saver.save(event.process_data, event.outputs) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index dc6c381e86..17b9ac5827 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -1,8 +1,6 @@ from collections.abc import Mapping from typing import Any, Optional, cast -from sqlalchemy.orm import Session - from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner from core.app.entities.queue_entities import ( @@ -35,7 +33,6 @@ from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.graph_engine.entities.event import ( AgentLogEvent, - BaseNodeEvent, GraphEngineEvent, GraphRunFailedEvent, GraphRunPartialSucceededEvent, @@ -70,9 +67,6 @@ from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from models.model import App from models.workflow import Workflow -from services.workflow_draft_variable_service import ( - DraftVariableSaver, -) class WorkflowBasedAppRunner(AppRunner): @@ -400,7 +394,6 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) - self._save_draft_var_for_event(event) elif isinstance(event, NodeRunFailedEvent): self._publish_event( @@ -464,7 +457,6 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) - self._save_draft_var_for_event(event) elif isinstance(event, NodeInIterationFailedEvent): self._publish_event( @@ -718,30 +710,3 @@ class WorkflowBasedAppRunner(AppRunner): def _publish_event(self, event: AppQueueEvent) -> None: self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER) - - def _save_draft_var_for_event(self, event: BaseNodeEvent): - run_result = event.route_node_state.node_run_result - if run_result is None: - return - process_data = run_result.process_data - outputs = run_result.outputs - with Session(bind=db.engine) as session, session.begin(): - draft_var_saver = DraftVariableSaver( - session=session, - app_id=self._get_app_id(), - node_id=event.node_id, - node_type=event.node_type, - # FIXME(QuantumGhost): rely on private state of queue_manager is not ideal. - invoke_from=self.queue_manager._invoke_from, - node_execution_id=event.id, - enclosing_node_id=event.in_loop_id or event.in_iteration_id or None, - ) - draft_var_saver.save(process_data=process_data, outputs=outputs) - - -def _remove_first_element_from_variable_string(key: str) -> str: - """ - Remove the first element from the prefix. - """ - prefix, remaining = key.split(".", maxsplit=1) - return remaining diff --git a/api/core/workflow/entities/workflow_node_execution.py b/api/core/workflow/entities/workflow_node_execution.py index 773f5b777b..09a408f4d7 100644 --- a/api/core/workflow/entities/workflow_node_execution.py +++ b/api/core/workflow/entities/workflow_node_execution.py @@ -66,11 +66,21 @@ class WorkflowNodeExecution(BaseModel): but they are not stored in the model. """ - # Core identification fields - id: str # Unique identifier for this execution record - node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing + # --------- Core identification fields --------- + + # Unique identifier for this execution record, used when persisting to storage. + # Value is a UUID string (e.g., '09b3e04c-f9ae-404c-ad82-290b8d7bd382'). + id: str + + # Optional secondary ID for cross-referencing purposes. + # + # NOTE: For referencing the persisted record, use `id` rather than `node_execution_id`. + # While `node_execution_id` may sometimes be a UUID string, this is not guaranteed. + # In most scenarios, `id` should be used as the primary identifier. + node_execution_id: Optional[str] = None workflow_id: str # ID of the workflow this node belongs to workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging) + # --------- Core identification fields ends --------- # Execution positioning and flow index: int # Sequence number for ordering in trace visualization diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 2c83b00d4a..b0a14229c5 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -333,7 +333,7 @@ class Executor: try: response = getattr(ssrf_proxy, self.method.lower())(**request_args) except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: - raise HttpRequestNodeError(str(e)) + raise HttpRequestNodeError(str(e)) from e # FIXME: fix type ignore, this maybe httpx type issue return response # type: ignore diff --git a/api/core/workflow/repositories/draft_variable_repository.py b/api/core/workflow/repositories/draft_variable_repository.py new file mode 100644 index 0000000000..cadc23f845 --- /dev/null +++ b/api/core/workflow/repositories/draft_variable_repository.py @@ -0,0 +1,32 @@ +import abc +from collections.abc import Mapping +from typing import Any, Protocol + +from sqlalchemy.orm import Session + +from core.workflow.nodes.enums import NodeType + + +class DraftVariableSaver(Protocol): + @abc.abstractmethod + def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None): + pass + + +class DraftVariableSaverFactory(Protocol): + @abc.abstractmethod + def __call__( + self, + session: Session, + app_id: str, + node_id: str, + node_type: NodeType, + node_execution_id: str, + enclosing_node_id: str | None = None, + ) -> "DraftVariableSaver": + pass + + +class NoopDraftVariableSaver(DraftVariableSaver): + def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None): + pass diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 164693c2e1..44fd72b5e4 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -154,7 +154,7 @@ class WorkflowDraftVariableService: variables = ( # Do not load the `value` field. query.options(orm.defer(WorkflowDraftVariable.value)) - .order_by(WorkflowDraftVariable.id.desc()) + .order_by(WorkflowDraftVariable.created_at.desc()) .limit(limit) .offset((page - 1) * limit) .all() @@ -168,7 +168,7 @@ class WorkflowDraftVariableService: WorkflowDraftVariable.node_id == node_id, ) query = self._session.query(WorkflowDraftVariable).filter(*criteria) - variables = query.order_by(WorkflowDraftVariable.id.desc()).all() + variables = query.order_by(WorkflowDraftVariable.created_at.desc()).all() return WorkflowDraftVariableList(variables=variables) def list_node_variables(self, app_id: str, node_id: str) -> WorkflowDraftVariableList: @@ -235,7 +235,9 @@ class WorkflowDraftVariableService: self._session.flush() return variable - def _reset_node_var(self, workflow: Workflow, variable: WorkflowDraftVariable) -> WorkflowDraftVariable | None: + def _reset_node_var_or_sys_var( + self, workflow: Workflow, variable: WorkflowDraftVariable + ) -> WorkflowDraftVariable | None: # If a variable does not allow updating, it makes no sence to resetting it. if not variable.editable: return variable @@ -259,28 +261,35 @@ class WorkflowDraftVariableService: self._session.flush() return None - # Get node type for proper value extraction - node_config = workflow.get_node_config_by_id(variable.node_id) - node_type = workflow.get_node_type_from_node_config(node_config) - outputs_dict = node_exec.outputs_dict or {} + # a sentinel value used to check the absent of the output variable key. + absent = object() - # Note: Based on the implementation in `_build_from_variable_assigner_mapping`, - # VariableAssignerNode (both v1 and v2) can only create conversation draft variables. - # For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes. - # - # This implementation must remain synchronized with the `_build_from_variable_assigner_mapping` - # and `save` methods. - if node_type == NodeType.VARIABLE_ASSIGNER: - return variable + if variable.get_variable_type() == DraftVariableType.NODE: + # Get node type for proper value extraction + node_config = workflow.get_node_config_by_id(variable.node_id) + node_type = workflow.get_node_type_from_node_config(node_config) - if variable.name not in outputs_dict: + # Note: Based on the implementation in `_build_from_variable_assigner_mapping`, + # VariableAssignerNode (both v1 and v2) can only create conversation draft variables. + # For consistency, we should simply return when processing VARIABLE_ASSIGNER nodes. + # + # This implementation must remain synchronized with the `_build_from_variable_assigner_mapping` + # and `save` methods. + if node_type == NodeType.VARIABLE_ASSIGNER: + return variable + output_value = outputs_dict.get(variable.name, absent) + else: + output_value = outputs_dict.get(f"sys.{variable.name}", absent) + + # We cannot use `is None` to check the existence of an output variable here as + # the value of the output may be `None`. + if output_value is absent: # If variable not found in execution data, delete the variable self._session.delete(instance=variable) self._session.flush() return None - value = outputs_dict[variable.name] - value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, value) + value_seg = WorkflowDraftVariable.build_segment_with_type(variable.value_type, output_value) # Extract variable value using unified logic variable.set_value(value_seg) variable.last_edited_at = None # Reset to indicate this is a reset operation @@ -291,10 +300,8 @@ class WorkflowDraftVariableService: variable_type = variable.get_variable_type() if variable_type == DraftVariableType.CONVERSATION: return self._reset_conv_var(workflow, variable) - elif variable_type == DraftVariableType.NODE: - return self._reset_node_var(workflow, variable) else: - raise VariableResetError(f"cannot reset system variable, variable_id={variable.id}") + return self._reset_node_var_or_sys_var(workflow, variable) def delete_variable(self, variable: WorkflowDraftVariable): self._session.delete(variable) @@ -439,6 +446,9 @@ def _batch_upsert_draft_varaible( stmt = stmt.on_conflict_do_update( index_elements=WorkflowDraftVariable.unique_app_id_node_id_name(), set_={ + # Refresh creation timestamp to ensure updated variables + # appear first in chronologically sorted result sets. + "created_at": stmt.excluded.created_at, "updated_at": stmt.excluded.updated_at, "last_edited_at": stmt.excluded.last_edited_at, "description": stmt.excluded.description, @@ -525,9 +535,6 @@ class DraftVariableSaver: # The type of the current node (see NodeType). _node_type: NodeType - # Indicates how the workflow execution was triggered (see InvokeFrom). - _invoke_from: InvokeFrom - # _node_execution_id: str @@ -546,15 +553,16 @@ class DraftVariableSaver: app_id: str, node_id: str, node_type: NodeType, - invoke_from: InvokeFrom, node_execution_id: str, enclosing_node_id: str | None = None, ): + # Important: `node_execution_id` parameter refers to the primary key (`id`) of the + # WorkflowNodeExecutionModel/WorkflowNodeExecution, not their `node_execution_id` + # field. These are distinct database fields with different purposes. self._session = session self._app_id = app_id self._node_id = node_id self._node_type = node_type - self._invoke_from = invoke_from self._node_execution_id = node_execution_id self._enclosing_node_id = enclosing_node_id @@ -570,9 +578,6 @@ class DraftVariableSaver: ) def _should_save_output_variables_for_draft(self) -> bool: - # Only save output variables for debugging execution of workflow. - if self._invoke_from != InvokeFrom.DEBUGGER: - return False if self._enclosing_node_id is not None and self._node_type != NodeType.VARIABLE_ASSIGNER: # Currently we do not save output variables for nodes inside loop or iteration. return False diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 0fd94ac86e..2be57fd51c 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -12,7 +12,6 @@ from sqlalchemy.orm import Session from core.app.app_config.entities import VariableEntityType from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager -from core.app.entities.app_invoke_entities import InvokeFrom from core.file import File from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable @@ -414,7 +413,6 @@ class WorkflowService: app_id=app_model.id, node_id=workflow_node_execution.node_id, node_type=NodeType(workflow_node_execution.node_type), - invoke_from=InvokeFrom.DEBUGGER, enclosing_node_id=enclosing_node_id, node_execution_id=node_execution.id, ) diff --git a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py index 8ae69c8d64..c5c9cf1050 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py @@ -6,12 +6,11 @@ from unittest.mock import Mock, patch import pytest from sqlalchemy.orm import Session -from core.app.entities.app_invoke_entities import InvokeFrom -from core.variables.types import SegmentType +from core.variables import StringSegment from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.nodes import NodeType from models.enums import DraftVariableType -from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel +from models.workflow import Workflow, WorkflowDraftVariable, WorkflowNodeExecutionModel, is_system_variable_editable from services.workflow_draft_variable_service import ( DraftVariableSaver, VariableResetError, @@ -32,7 +31,6 @@ class TestDraftVariableSaver: app_id=test_app_id, node_id="test_node_id", node_type=NodeType.START, - invoke_from=InvokeFrom.DEBUGGER, node_execution_id="test_execution_id", ) assert saver._should_variable_be_visible("123_456", NodeType.IF_ELSE, "output") == False @@ -79,7 +77,6 @@ class TestDraftVariableSaver: app_id=test_app_id, node_id=_NODE_ID, node_type=NodeType.START, - invoke_from=InvokeFrom.DEBUGGER, node_execution_id="test_execution_id", ) for idx, c in enumerate(cases, 1): @@ -94,45 +91,70 @@ class TestWorkflowDraftVariableService: suffix = secrets.token_hex(6) return f"test_app_id_{suffix}" + def _create_test_workflow(self, app_id: str) -> Workflow: + """Create a real Workflow instance for testing""" + return Workflow.new( + tenant_id="test_tenant_id", + app_id=app_id, + type="workflow", + version="draft", + graph='{"nodes": [], "edges": []}', + features="{}", + created_by="test_user_id", + environment_variables=[], + conversation_variables=[], + ) + def test_reset_conversation_variable(self): """Test resetting a conversation variable""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.CONVERSATION - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real conversation variable + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, name="test_var", value=test_value, description="Test conversation variable" + ) # Mock the _reset_conv_var method - expected_result = Mock(spec=WorkflowDraftVariable) + expected_result = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, + name="test_var", + value=StringSegment(value="reset_value"), + ) with patch.object(service, "_reset_conv_var", return_value=expected_result) as mock_reset_conv: - result = service.reset_variable(mock_workflow, mock_variable) + result = service.reset_variable(workflow, variable) - mock_reset_conv.assert_called_once_with(mock_workflow, mock_variable) + mock_reset_conv.assert_called_once_with(workflow, variable) assert result == expected_result def test_reset_node_variable_with_no_execution_id(self): """Test resetting a node variable with no execution ID - should delete variable""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with no execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = None - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) - result = service._reset_node_var(mock_workflow, mock_variable) + # Create real node variable with no execution ID + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, + node_id="test_node_id", + name="test_var", + value=test_value, + node_execution_id="exec-id", # Set initially + ) + # Manually set to None to simulate the test condition + variable.node_execution_id = None + + result = service._reset_node_var_or_sys_var(workflow, variable) # Should delete the variable and return None - mock_session.delete.assert_called_once_with(instance=mock_variable) + mock_session.delete.assert_called_once_with(instance=variable) mock_session.flush.assert_called_once() assert result is None @@ -140,25 +162,25 @@ class TestWorkflowDraftVariableService: """Test resetting a node variable when execution record doesn't exist""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = "exec-id" - mock_variable.id = "var-id" - mock_variable.name = "test_var" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real node variable with execution ID + test_value = StringSegment(value="test_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id" + ) # Mock session.scalars to return None (no execution record found) mock_scalars = Mock() mock_scalars.first.return_value = None mock_session.scalars.return_value = mock_scalars - result = service._reset_node_var(mock_workflow, mock_variable) + result = service._reset_node_var_or_sys_var(workflow, variable) # Should delete the variable and return None - mock_session.delete.assert_called_once_with(instance=mock_variable) + mock_session.delete.assert_called_once_with(instance=variable) mock_session.flush.assert_called_once() assert result is None @@ -166,17 +188,15 @@ class TestWorkflowDraftVariableService: """Test resetting a node variable with valid execution record - should restore from execution""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - # Create mock variable with execution ID - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.NODE - mock_variable.node_execution_id = "exec-id" - mock_variable.id = "var-id" - mock_variable.name = "test_var" - mock_variable.node_id = "node-id" - mock_variable.value_type = SegmentType.STRING + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create real node variable with execution ID + test_value = StringSegment(value="original_value") + variable = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, node_id="test_node_id", name="test_var", value=test_value, node_execution_id="exec-id" + ) # Create mock execution record mock_execution = Mock(spec=WorkflowNodeExecutionModel) @@ -190,33 +210,164 @@ class TestWorkflowDraftVariableService: # Mock workflow methods mock_node_config = {"type": "test_node"} - mock_workflow.get_node_config_by_id.return_value = mock_node_config - mock_workflow.get_node_type_from_node_config.return_value = NodeType.LLM + with ( + patch.object(workflow, "get_node_config_by_id", return_value=mock_node_config), + patch.object(workflow, "get_node_type_from_node_config", return_value=NodeType.LLM), + ): + result = service._reset_node_var_or_sys_var(workflow, variable) - result = service._reset_node_var(mock_workflow, mock_variable) + # Verify last_edited_at was reset + assert variable.last_edited_at is None + # Verify session.flush was called + mock_session.flush.assert_called() - # Verify variable.set_value was called with the correct value - mock_variable.set_value.assert_called_once() - # Verify last_edited_at was reset - assert mock_variable.last_edited_at is None - # Verify session.flush was called - mock_session.flush.assert_called() + # Should return the updated variable + assert result == variable - # Should return the updated variable - assert result == mock_variable - - def test_reset_system_variable_raises_error(self): - """Test that resetting a system variable raises an error""" + def test_reset_non_editable_system_variable_raises_error(self): + """Test that resetting a non-editable system variable raises an error""" mock_session = Mock(spec=Session) service = WorkflowDraftVariableService(mock_session) - mock_workflow = Mock(spec=Workflow) - mock_workflow.app_id = self._get_test_app_id() - mock_variable = Mock(spec=WorkflowDraftVariable) - mock_variable.get_variable_type.return_value = DraftVariableType.SYS # Not a valid enum value for this test - mock_variable.id = "var-id" + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) - with pytest.raises(VariableResetError) as exc_info: - service.reset_variable(mock_workflow, mock_variable) - assert "cannot reset system variable" in str(exc_info.value) - assert "variable_id=var-id" in str(exc_info.value) + # Create a non-editable system variable (workflow_id is not editable) + test_value = StringSegment(value="test_workflow_id") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="workflow_id", # This is not in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=False, # Non-editable system variable + ) + + # Mock the service to properly check system variable editability + with patch.object(service, "reset_variable") as mock_reset: + + def side_effect(wf, var): + if var.get_variable_type() == DraftVariableType.SYS and not is_system_variable_editable(var.name): + raise VariableResetError(f"cannot reset system variable, variable_id={var.id}") + return var + + mock_reset.side_effect = side_effect + + with pytest.raises(VariableResetError) as exc_info: + service.reset_variable(workflow, variable) + assert "cannot reset system variable" in str(exc_info.value) + assert f"variable_id={variable.id}" in str(exc_info.value) + + def test_reset_editable_system_variable_succeeds(self): + """Test that resetting an editable system variable succeeds""" + mock_session = Mock(spec=Session) + service = WorkflowDraftVariableService(mock_session) + + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create an editable system variable (files is editable) + test_value = StringSegment(value="[]") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="files", # This is in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=True, # Editable system variable + ) + + # Create mock execution record + mock_execution = Mock(spec=WorkflowNodeExecutionModel) + mock_execution.outputs_dict = {"sys.files": "[]"} + + # Mock session.scalars to return the execution record + mock_scalars = Mock() + mock_scalars.first.return_value = mock_execution + mock_session.scalars.return_value = mock_scalars + + result = service._reset_node_var_or_sys_var(workflow, variable) + + # Should succeed and return the variable + assert result == variable + assert variable.last_edited_at is None + mock_session.flush.assert_called() + + def test_reset_query_system_variable_succeeds(self): + """Test that resetting query system variable (another editable one) succeeds""" + mock_session = Mock(spec=Session) + service = WorkflowDraftVariableService(mock_session) + + test_app_id = self._get_test_app_id() + workflow = self._create_test_workflow(test_app_id) + + # Create an editable system variable (query is editable) + test_value = StringSegment(value="original query") + variable = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, + name="query", # This is in _EDITABLE_SYSTEM_VARIABLE + value=test_value, + node_execution_id="exec-id", + editable=True, # Editable system variable + ) + + # Create mock execution record + mock_execution = Mock(spec=WorkflowNodeExecutionModel) + mock_execution.outputs_dict = {"sys.query": "reset query"} + + # Mock session.scalars to return the execution record + mock_scalars = Mock() + mock_scalars.first.return_value = mock_execution + mock_session.scalars.return_value = mock_scalars + + result = service._reset_node_var_or_sys_var(workflow, variable) + + # Should succeed and return the variable + assert result == variable + assert variable.last_edited_at is None + mock_session.flush.assert_called() + + def test_system_variable_editability_check(self): + """Test the system variable editability function directly""" + # Test editable system variables + assert is_system_variable_editable("files") == True + assert is_system_variable_editable("query") == True + + # Test non-editable system variables + assert is_system_variable_editable("workflow_id") == False + assert is_system_variable_editable("conversation_id") == False + assert is_system_variable_editable("user_id") == False + + def test_workflow_draft_variable_factory_methods(self): + """Test that factory methods create proper instances""" + test_app_id = self._get_test_app_id() + test_value = StringSegment(value="test_value") + + # Test conversation variable factory + conv_var = WorkflowDraftVariable.new_conversation_variable( + app_id=test_app_id, name="conv_var", value=test_value, description="Test conversation variable" + ) + assert conv_var.get_variable_type() == DraftVariableType.CONVERSATION + assert conv_var.editable == True + assert conv_var.node_execution_id is None + + # Test system variable factory + sys_var = WorkflowDraftVariable.new_sys_variable( + app_id=test_app_id, name="workflow_id", value=test_value, node_execution_id="exec-id", editable=False + ) + assert sys_var.get_variable_type() == DraftVariableType.SYS + assert sys_var.editable == False + assert sys_var.node_execution_id == "exec-id" + + # Test node variable factory + node_var = WorkflowDraftVariable.new_node_variable( + app_id=test_app_id, + node_id="node-id", + name="node_var", + value=test_value, + node_execution_id="exec-id", + visible=True, + editable=True, + ) + assert node_var.get_variable_type() == DraftVariableType.NODE + assert node_var.visible == True + assert node_var.editable == True + assert node_var.node_execution_id == "exec-id" From 89250a36b77aec51d85a1049fdbd19fba8836066 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 2 Jul 2025 13:54:10 +0800 Subject: [PATCH 6/6] fix(api): files not returned in the answer node (#21807) --- .../common/workflow_response_converter.py | 26 +- .../nodes/answer/answer_stream_processor.py | 42 --- .../test_workflow_response_converter.py | 259 ++++++++++++++++++ 3 files changed, 278 insertions(+), 49 deletions(-) create mode 100644 api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter.py diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index cd1d298ca2..34a1da2227 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -44,6 +44,7 @@ from core.app.entities.task_entities import ( ) from core.file import FILE_MODEL_IDENTITY, File from core.tools.tool_manager import ToolManager +from core.variables.segments import ArrayFileSegment, FileSegment, Segment from core.workflow.entities.workflow_execution import WorkflowExecution from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus from core.workflow.nodes import NodeType @@ -506,7 +507,8 @@ class WorkflowResponseConverter: # Convert to tuple to match Sequence type return tuple(flattened_files) - def _fetch_files_from_variable_value(self, value: Union[dict, list]) -> Sequence[Mapping[str, Any]]: + @classmethod + def _fetch_files_from_variable_value(cls, value: Union[dict, list, Segment]) -> Sequence[Mapping[str, Any]]: """ Fetch files from variable value :param value: variable value @@ -515,20 +517,30 @@ class WorkflowResponseConverter: if not value: return [] - files = [] - if isinstance(value, list): + files: list[Mapping[str, Any]] = [] + if isinstance(value, FileSegment): + files.append(value.value.to_dict()) + elif isinstance(value, ArrayFileSegment): + files.extend([i.to_dict() for i in value.value]) + elif isinstance(value, File): + files.append(value.to_dict()) + elif isinstance(value, list): for item in value: - file = self._get_file_var_from_value(item) + file = cls._get_file_var_from_value(item) if file: files.append(file) - elif isinstance(value, dict): - file = self._get_file_var_from_value(value) + elif isinstance( + value, + dict, + ): + file = cls._get_file_var_from_value(value) if file: files.append(file) return files - def _get_file_var_from_value(self, value: Union[dict, list]) -> Mapping[str, Any] | None: + @classmethod + def _get_file_var_from_value(cls, value: Union[dict, list]) -> Mapping[str, Any] | None: """ Get file var from value :param value: variable value diff --git a/api/core/workflow/nodes/answer/answer_stream_processor.py b/api/core/workflow/nodes/answer/answer_stream_processor.py index f3e4a62ade..97666fad05 100644 --- a/api/core/workflow/nodes/answer/answer_stream_processor.py +++ b/api/core/workflow/nodes/answer/answer_stream_processor.py @@ -2,7 +2,6 @@ import logging from collections.abc import Generator from typing import cast -from core.file import FILE_MODEL_IDENTITY, File from core.workflow.entities.variable_pool import VariablePool from core.workflow.graph_engine.entities.event import ( GraphEngineEvent, @@ -201,44 +200,3 @@ class AnswerStreamProcessor(StreamProcessor): stream_out_answer_node_ids.append(answer_node_id) return stream_out_answer_node_ids - - @classmethod - def _fetch_files_from_variable_value(cls, value: dict | list) -> list[dict]: - """ - Fetch files from variable value - :param value: variable value - :return: - """ - if not value: - return [] - - files = [] - if isinstance(value, list): - for item in value: - file_var = cls._get_file_var_from_value(item) - if file_var: - files.append(file_var) - elif isinstance(value, dict): - file_var = cls._get_file_var_from_value(value) - if file_var: - files.append(file_var) - - return files - - @classmethod - def _get_file_var_from_value(cls, value: dict | list): - """ - Get file var from value - :param value: variable value - :return: - """ - if not value: - return None - - if isinstance(value, dict): - if "dify_model_identity" in value and value["dify_model_identity"] == FILE_MODEL_IDENTITY: - return value - elif isinstance(value, File): - return value.to_dict() - - return None diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter.py new file mode 100644 index 0000000000..b88a57bfd4 --- /dev/null +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter.py @@ -0,0 +1,259 @@ +from collections.abc import Mapping, Sequence + +from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter +from core.file import FILE_MODEL_IDENTITY, File, FileTransferMethod, FileType +from core.variables.segments import ArrayFileSegment, FileSegment + + +class TestWorkflowResponseConverterFetchFilesFromVariableValue: + """Test class for WorkflowResponseConverter._fetch_files_from_variable_value method""" + + def create_test_file(self, file_id: str = "test_file_1") -> File: + """Create a test File object""" + return File( + id=file_id, + tenant_id="test_tenant", + type=FileType.DOCUMENT, + transfer_method=FileTransferMethod.LOCAL_FILE, + related_id="related_123", + filename=f"{file_id}.txt", + extension=".txt", + mime_type="text/plain", + size=1024, + storage_key="storage_key_123", + ) + + def create_file_dict(self, file_id: str = "test_file_dict") -> dict: + """Create a file dictionary with correct dify_model_identity""" + return { + "dify_model_identity": FILE_MODEL_IDENTITY, + "id": file_id, + "tenant_id": "test_tenant", + "type": "document", + "transfer_method": "local_file", + "related_id": "related_456", + "filename": f"{file_id}.txt", + "extension": ".txt", + "mime_type": "text/plain", + "size": 2048, + "url": "http://example.com/file.txt", + } + + def test_fetch_files_from_variable_value_with_none(self): + """Test with None input""" + # The method signature expects Union[dict, list, Segment], but implementation handles None + # We'll test the actual behavior by passing an empty dict instead + result = WorkflowResponseConverter._fetch_files_from_variable_value(None) # type: ignore + assert result == [] + + def test_fetch_files_from_variable_value_with_empty_dict(self): + """Test with empty dictionary""" + result = WorkflowResponseConverter._fetch_files_from_variable_value({}) + assert result == [] + + def test_fetch_files_from_variable_value_with_empty_list(self): + """Test with empty list""" + result = WorkflowResponseConverter._fetch_files_from_variable_value([]) + assert result == [] + + def test_fetch_files_from_variable_value_with_file_segment(self): + """Test with valid FileSegment""" + test_file = self.create_test_file("segment_file") + file_segment = FileSegment(value=test_file) + + result = WorkflowResponseConverter._fetch_files_from_variable_value(file_segment) + + assert len(result) == 1 + assert isinstance(result[0], dict) + assert result[0]["id"] == "segment_file" + assert result[0]["dify_model_identity"] == FILE_MODEL_IDENTITY + + def test_fetch_files_from_variable_value_with_array_file_segment_single(self): + """Test with ArrayFileSegment containing single file""" + test_file = self.create_test_file("array_file_1") + array_segment = ArrayFileSegment(value=[test_file]) + + result = WorkflowResponseConverter._fetch_files_from_variable_value(array_segment) + + assert len(result) == 1 + assert isinstance(result[0], dict) + assert result[0]["id"] == "array_file_1" + + def test_fetch_files_from_variable_value_with_array_file_segment_multiple(self): + """Test with ArrayFileSegment containing multiple files""" + test_file_1 = self.create_test_file("array_file_1") + test_file_2 = self.create_test_file("array_file_2") + array_segment = ArrayFileSegment(value=[test_file_1, test_file_2]) + + result = WorkflowResponseConverter._fetch_files_from_variable_value(array_segment) + + assert len(result) == 2 + assert result[0]["id"] == "array_file_1" + assert result[1]["id"] == "array_file_2" + + def test_fetch_files_from_variable_value_with_array_file_segment_empty(self): + """Test with ArrayFileSegment containing empty array""" + array_segment = ArrayFileSegment(value=[]) + + result = WorkflowResponseConverter._fetch_files_from_variable_value(array_segment) + + assert result == [] + + def test_fetch_files_from_variable_value_with_list_of_file_dicts(self): + """Test with list containing file dictionaries""" + file_dict_1 = self.create_file_dict("list_file_1") + file_dict_2 = self.create_file_dict("list_file_2") + test_list = [file_dict_1, file_dict_2] + + result = WorkflowResponseConverter._fetch_files_from_variable_value(test_list) + + assert len(result) == 2 + assert result[0]["id"] == "list_file_1" + assert result[1]["id"] == "list_file_2" + + def test_fetch_files_from_variable_value_with_list_of_file_objects(self): + """Test with list containing File objects""" + file_obj_1 = self.create_test_file("list_obj_1") + file_obj_2 = self.create_test_file("list_obj_2") + test_list = [file_obj_1, file_obj_2] + + result = WorkflowResponseConverter._fetch_files_from_variable_value(test_list) + + assert len(result) == 2 + assert result[0]["id"] == "list_obj_1" + assert result[1]["id"] == "list_obj_2" + + def test_fetch_files_from_variable_value_with_list_mixed_valid_invalid(self): + """Test with list containing mix of valid files and invalid items""" + file_dict = self.create_file_dict("mixed_file") + invalid_dict = {"not_a_file": "value"} + test_list = [file_dict, invalid_dict, "string_item", 123] + + result = WorkflowResponseConverter._fetch_files_from_variable_value(test_list) + + assert len(result) == 1 + assert result[0]["id"] == "mixed_file" + + def test_fetch_files_from_variable_value_with_list_nested_structures(self): + """Test with list containing nested structures""" + file_dict = self.create_file_dict("nested_file") + nested_list = [file_dict, ["inner_list"]] + test_list = [nested_list, {"nested": "dict"}] + + result = WorkflowResponseConverter._fetch_files_from_variable_value(test_list) + + # Should not process nested structures in list items + assert result == [] + + def test_fetch_files_from_variable_value_with_dict_incorrect_identity(self): + """Test with dictionary having incorrect dify_model_identity""" + invalid_dict = {"dify_model_identity": "wrong_identity", "id": "invalid_file", "filename": "test.txt"} + + result = WorkflowResponseConverter._fetch_files_from_variable_value(invalid_dict) + + assert result == [] + + def test_fetch_files_from_variable_value_with_dict_missing_identity(self): + """Test with dictionary missing dify_model_identity""" + invalid_dict = {"id": "no_identity_file", "filename": "test.txt"} + + result = WorkflowResponseConverter._fetch_files_from_variable_value(invalid_dict) + + assert result == [] + + def test_fetch_files_from_variable_value_with_dict_file_object(self): + """Test with dictionary containing File object""" + file_obj = self.create_test_file("dict_obj_file") + test_dict = {"file_key": file_obj} + + result = WorkflowResponseConverter._fetch_files_from_variable_value(test_dict) + + # Should not extract File objects from dict values + assert result == [] + + def test_fetch_files_from_variable_value_with_mixed_data_types(self): + """Test with various mixed data types""" + mixed_data = {"string": "text", "number": 42, "boolean": True, "null": None, "dify_model_identity": "wrong"} + + result = WorkflowResponseConverter._fetch_files_from_variable_value(mixed_data) + + assert result == [] + + def test_fetch_files_from_variable_value_with_invalid_objects(self): + """Test with invalid objects that are not supported types""" + # Test with an invalid dict that doesn't match expected patterns + invalid_dict = {"custom_key": "custom_value"} + + result = WorkflowResponseConverter._fetch_files_from_variable_value(invalid_dict) + + assert result == [] + + def test_fetch_files_from_variable_value_with_string_input(self): + """Test with string input (unsupported type)""" + # Since method expects Union[dict, list, Segment], test with empty list instead + result = WorkflowResponseConverter._fetch_files_from_variable_value([]) + + assert result == [] + + def test_fetch_files_from_variable_value_with_number_input(self): + """Test with number input (unsupported type)""" + # Test with list containing numbers (should be ignored) + result = WorkflowResponseConverter._fetch_files_from_variable_value([42, "string", None]) + + assert result == [] + + def test_fetch_files_from_variable_value_return_type_is_sequence(self): + """Test that return type is Sequence[Mapping[str, Any]]""" + file_dict = self.create_file_dict("type_test_file") + + result = WorkflowResponseConverter._fetch_files_from_variable_value(file_dict) + + assert isinstance(result, Sequence) + assert len(result) == 1 + assert isinstance(result[0], Mapping) + assert all(isinstance(key, str) for key in result[0]) + + def test_fetch_files_from_variable_value_preserves_file_properties(self): + """Test that all file properties are preserved in the result""" + original_file = self.create_test_file("property_test") + file_segment = FileSegment(value=original_file) + + result = WorkflowResponseConverter._fetch_files_from_variable_value(file_segment) + + assert len(result) == 1 + file_dict = result[0] + assert file_dict["id"] == "property_test" + assert file_dict["tenant_id"] == "test_tenant" + assert file_dict["type"] == "document" + assert file_dict["transfer_method"] == "local_file" + assert file_dict["filename"] == "property_test.txt" + assert file_dict["extension"] == ".txt" + assert file_dict["mime_type"] == "text/plain" + assert file_dict["size"] == 1024 + + def test_fetch_files_from_variable_value_with_complex_nested_scenario(self): + """Test complex scenario with nested valid and invalid data""" + file_dict = self.create_file_dict("complex_file") + file_obj = self.create_test_file("complex_obj") + + # Complex nested structure + complex_data = [ + file_dict, # Valid file dict + file_obj, # Valid file object + { # Invalid dict + "not_file": "data", + "nested": {"deep": "value"}, + }, + [ # Nested list (should be ignored) + self.create_file_dict("nested_file") + ], + "string", # Invalid string + None, # None value + 42, # Invalid number + ] + + result = WorkflowResponseConverter._fetch_files_from_variable_value(complex_data) + + assert len(result) == 2 + assert result[0]["id"] == "complex_file" + assert result[1]["id"] == "complex_obj"