diff --git a/.gitignore b/.gitignore index aaca9f2b0a..3cd752aec2 100644 --- a/.gitignore +++ b/.gitignore @@ -239,4 +239,5 @@ scripts/stress-test/reports/ *.local.md # Code Agent Folder -.qoder/* \ No newline at end of file +.qoder/* +node_modules/ \ No newline at end of file diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 5d974335ff..ff550bb39d 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -32,7 +32,6 @@ from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotA from core.app.layers.pause_state_persist_layer import PauseStateLayerConfig, PauseStatePersistenceLayer from core.helper.trace_id_helper import extract_external_trace_id_from_args from core.ops.ops_trace_manager import TraceQueueManager -from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import DifyCoreRepositoryFactory from dify_graph.graph_engine.layers.base import GraphEngineLayer from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError @@ -473,10 +472,21 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): db.session.commit() db.session.refresh(conversation) - # get conversation dialogue count + # get conversation dialogue count (lazy — computed only when referenced in workflow) # NOTE: dialogue_count should not start from 0, # because during the first conversation, dialogue_count should be 1. - self._dialogue_count = get_thread_messages_length(conversation.id) + 1 + # Capture the engine reference while in app context to use it in the resolver closure + engine = db.engine + + def _dialogue_count_resolver() -> int: + from sqlalchemy.orm import Session + + from core.prompt.utils.get_thread_messages_length import get_thread_messages_length_with_session + + with Session(engine) as session: + return get_thread_messages_length_with_session(conversation.id, session) + 1 + + self._dialogue_count_resolver = _dialogue_count_resolver # init queue manager queue_manager = MessageBasedAppQueueManager( @@ -612,7 +622,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): queue_manager=queue_manager, conversation=conversation, message=message, - dialogue_count=self._dialogue_count, + dialogue_count_resolver=self._dialogue_count_resolver, variable_loader=variable_loader, workflow=workflow, system_user_id=system_user_id, @@ -675,7 +685,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation=conversation, message=message, user=user, - dialogue_count=self._dialogue_count, + dialogue_count_resolver=self._dialogue_count_resolver, stream=stream, draft_var_saver_factory=draft_var_saver_factory, ) @@ -693,8 +703,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): _T = TypeVar("_T", bound=Base) -def _refresh_model(session, model: _T) -> _T: - with Session(bind=db.engine, expire_on_commit=False) as session: - detach_model = session.get(type(model), model.id) - assert detach_model is not None - return detach_model +def _refresh_model(session: Session, model: _T) -> _T: + detach_model = session.get(type(model), model.id) + assert detach_model is not None + return detach_model diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 66037696af..8c1b98f8d3 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -1,6 +1,6 @@ import logging import time -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from typing import Any, cast from sqlalchemy import select @@ -58,7 +58,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): queue_manager: AppQueueManager, conversation: Conversation, message: Message, - dialogue_count: int, + dialogue_count_resolver: Callable[[], int], variable_loader: VariableLoader, workflow: Workflow, system_user_id: str, @@ -77,7 +77,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): self.application_generate_entity = application_generate_entity self.conversation = conversation self.message = message - self._dialogue_count = dialogue_count + self._dialogue_count_resolver = dialogue_count_resolver self._workflow = workflow self.system_user_id = system_user_id self._app = app @@ -95,11 +95,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): files=self.application_generate_entity.files, conversation_id=self.conversation.id, user_id=self.system_user_id, - dialogue_count=self._dialogue_count, app_id=app_config.app_id, workflow_id=app_config.workflow_id, workflow_execution_id=self.application_generate_entity.workflow_run_id, ) + system_inputs._dialogue_count_resolver = self._dialogue_count_resolver with Session(db.engine, expire_on_commit=False) as session: app_record = session.scalar(select(App).where(App.id == app_config.app_id)) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index f7b5030d33..84bd32b22d 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -97,7 +97,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): message: Message, user: Union[Account, EndUser], stream: bool, - dialogue_count: int, + dialogue_count_resolver: Callable[[], int], draft_var_saver_factory: DraftVariableSaverFactory, ): self._base_task_pipeline = BasedGenerateTaskPipeline( @@ -122,11 +122,11 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): files=application_generate_entity.files, conversation_id=conversation.id, user_id=user_session_id, - dialogue_count=dialogue_count, app_id=application_generate_entity.app_config.app_id, workflow_id=workflow.id, workflow_execution_id=application_generate_entity.workflow_run_id, ) + self._workflow_system_variables._dialogue_count_resolver = dialogue_count_resolver self._workflow_response_converter = WorkflowResponseConverter( application_generate_entity=application_generate_entity, user=user, diff --git a/api/core/app/workflow/layers/llm_quota.py b/api/core/app/workflow/layers/llm_quota.py index faf1516c40..8ddb917362 100644 --- a/api/core/app/workflow/layers/llm_quota.py +++ b/api/core/app/workflow/layers/llm_quota.py @@ -5,9 +5,7 @@ This layer centralizes model-quota deduction outside node implementations. """ import logging -from typing import TYPE_CHECKING, cast, final - -from typing_extensions import override +from typing import TYPE_CHECKING, cast, final, override from core.app.llm import deduct_llm_quota, ensure_llm_quota_available from core.errors.error import QuotaExceededError diff --git a/api/core/app/workflow/layers/observability.py b/api/core/app/workflow/layers/observability.py index 4b20477a7f..a613169433 100644 --- a/api/core/app/workflow/layers/observability.py +++ b/api/core/app/workflow/layers/observability.py @@ -9,11 +9,10 @@ associates with the node span. import logging from dataclasses import dataclass -from typing import cast, final +from typing import cast, final, override from opentelemetry import context as context_api from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_context -from typing_extensions import override from configs import dify_config from dify_graph.enums import BuiltinNodeTypes, NodeType diff --git a/api/core/entities/provider_configuration.py b/api/core/entities/provider_configuration.py index 7179629326..3aa6ee4424 100644 --- a/api/core/entities/provider_configuration.py +++ b/api/core/entities/provider_configuration.py @@ -9,7 +9,6 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator from sqlalchemy import func, select from sqlalchemy.orm import Session -from configs import dify_config from constants import HIDDEN_VALUE from core.entities.model_entities import ModelStatus, ModelWithProviderEntity, SimpleModelProviderEntity from core.entities.provider_entities import ( diff --git a/api/core/prompt/utils/get_thread_messages_length.py b/api/core/prompt/utils/get_thread_messages_length.py index de64c27a73..6d84504bcf 100644 --- a/api/core/prompt/utils/get_thread_messages_length.py +++ b/api/core/prompt/utils/get_thread_messages_length.py @@ -1,4 +1,5 @@ from sqlalchemy import select +from sqlalchemy.orm import Session from core.prompt.utils.extract_thread_messages import extract_thread_messages from extensions.ext_database import db @@ -22,3 +23,24 @@ def get_thread_messages_length(conversation_id: str) -> int: thread_messages.pop(0) return len(thread_messages) + + +def get_thread_messages_length_with_session(conversation_id: str, session: Session) -> int: + """ + Get the number of thread messages based on the parent message id, using a provided session. + + This variant is used when calling from outside Flask application context (e.g., in a lazy resolver). + """ + # Fetch all messages related to the conversation + stmt = select(Message).where(Message.conversation_id == conversation_id).order_by(Message.created_at.desc()) + + messages = session.scalars(stmt).all() + + # Extract thread messages + thread_messages = extract_thread_messages(messages) + + # Exclude the newly created message with an empty answer + if thread_messages and not thread_messages[0].answer: + thread_messages.pop(0) + + return len(thread_messages) diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index ab34263a79..52cdf5abe4 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -2,11 +2,10 @@ import importlib import pkgutil from collections.abc import Callable, Iterator, Mapping, MutableMapping from functools import lru_cache -from typing import TYPE_CHECKING, Any, TypeAlias, cast, final +from typing import TYPE_CHECKING, Any, TypeAlias, cast, final, override from sqlalchemy import select from sqlalchemy.orm import Session -from typing_extensions import override from configs import dify_config from core.app.entities.app_invoke_entities import DifyRunContext diff --git a/api/dify_graph/graph_engine/command_processing/command_handlers.py b/api/dify_graph/graph_engine/command_processing/command_handlers.py index eefd0c366b..33b2988a12 100644 --- a/api/dify_graph/graph_engine/command_processing/command_handlers.py +++ b/api/dify_graph/graph_engine/command_processing/command_handlers.py @@ -1,7 +1,5 @@ import logging -from typing import final - -from typing_extensions import override +from typing import final, override from dify_graph.entities.pause_reason import SchedulingPause from dify_graph.runtime import VariablePool diff --git a/api/dify_graph/graph_engine/layers/debug_logging.py b/api/dify_graph/graph_engine/layers/debug_logging.py index 1af2e2db9e..98e66406e5 100644 --- a/api/dify_graph/graph_engine/layers/debug_logging.py +++ b/api/dify_graph/graph_engine/layers/debug_logging.py @@ -7,9 +7,7 @@ graph execution for debugging purposes. import logging from collections.abc import Mapping -from typing import Any, final - -from typing_extensions import override +from typing import Any, final, override from dify_graph.graph_events import ( GraphEngineEvent, diff --git a/api/dify_graph/graph_engine/layers/execution_limits.py b/api/dify_graph/graph_engine/layers/execution_limits.py index 48ba5608d9..54a4567120 100644 --- a/api/dify_graph/graph_engine/layers/execution_limits.py +++ b/api/dify_graph/graph_engine/layers/execution_limits.py @@ -11,9 +11,7 @@ When limits are exceeded, the layer automatically aborts execution. import logging import time from enum import StrEnum -from typing import final - -from typing_extensions import override +from typing import final, override from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType from dify_graph.graph_engine.layers import GraphEngineLayer diff --git a/api/dify_graph/graph_engine/worker.py b/api/dify_graph/graph_engine/worker.py index 988c20d72a..5f04af1d9e 100644 --- a/api/dify_graph/graph_engine/worker.py +++ b/api/dify_graph/graph_engine/worker.py @@ -10,9 +10,7 @@ import threading import time from collections.abc import Sequence from datetime import datetime -from typing import TYPE_CHECKING, final - -from typing_extensions import override +from typing import TYPE_CHECKING, final, override from dify_graph.context import IExecutionContext from dify_graph.enums import WorkflowNodeExecutionStatus diff --git a/api/dify_graph/runtime/variable_pool.py b/api/dify_graph/runtime/variable_pool.py index e3ef6a2897..c7e541ec0e 100644 --- a/api/dify_graph/runtime/variable_pool.py +++ b/api/dify_graph/runtime/variable_pool.py @@ -6,7 +6,7 @@ from collections.abc import Mapping, Sequence from copy import deepcopy from typing import Annotated, Any, Union, cast -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, PrivateAttr from dify_graph.constants import ( CONVERSATION_VARIABLE_NODE_ID, @@ -15,7 +15,7 @@ from dify_graph.constants import ( SYSTEM_VARIABLE_NODE_ID, ) from dify_graph.file import File, FileAttribute, file_manager -from dify_graph.system_variable import SystemVariable +from dify_graph.system_variable import SystemVariable, SystemVariableKey from dify_graph.variables import Segment, SegmentGroup, VariableBase from dify_graph.variables.consts import SELECTORS_LENGTH from dify_graph.variables.segments import FileSegment, ObjectSegment @@ -59,6 +59,8 @@ class VariablePool(BaseModel): default_factory=list, ) + _lazy_resolvers: dict[tuple[str, str], Callable[[], Any]] = PrivateAttr(default_factory=dict) + def model_post_init(self, context: Any, /): # Create a mapping from field names to SystemVariableKey enum values self._add_system_variables(self.system_variables) @@ -168,7 +170,15 @@ class VariablePool(BaseModel): segment: Segment | None = node_map.get(name) if segment is None: - return None + # Check if this is a lazy system variable that hasn't been resolved yet + resolver = self._lazy_resolvers.get((node_id, name)) + if resolver is not None: + resolved_value = resolver() + self.add((node_id, name), resolved_value) # Cache the resolved value + del self._lazy_resolvers[(node_id, name)] # Remove resolver + segment = node_map.get(name) # Get the newly added segment + else: + return None if len(selector) == 2: return segment @@ -263,15 +273,24 @@ class VariablePool(BaseModel): return result def _add_system_variables(self, system_variable: SystemVariable): + # Handle lazy dialogue_count resolver separately (not in to_dict when value is None) + if system_variable._dialogue_count_resolver is not None: + selector = (SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.DIALOGUE_COUNT) + if not self._has(selector): + self._lazy_resolvers[selector] = system_variable._dialogue_count_resolver + sys_var_mapping = system_variable.to_dict() for key, value in sys_var_mapping.items(): - if value is None: - continue selector = (SYSTEM_VARIABLE_NODE_ID, key) # If the system variable already exists, do not add it again. # This ensures that we can keep the id of the system variables intact. if self._has(selector): continue + # Skip dialogue_count here since it's handled by the lazy resolver above + if key == SystemVariableKey.DIALOGUE_COUNT: + continue + if value is None: + continue self.add(selector, value) @classmethod diff --git a/api/dify_graph/system_variable.py b/api/dify_graph/system_variable.py index cc5deda892..78334c9587 100644 --- a/api/dify_graph/system_variable.py +++ b/api/dify_graph/system_variable.py @@ -1,11 +1,11 @@ from __future__ import annotations -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from types import MappingProxyType from typing import Any from uuid import uuid4 -from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator +from pydantic import AliasChoices, BaseModel, ConfigDict, Field, PrivateAttr, model_validator from dify_graph.enums import SystemVariableKey from dify_graph.file.models import File @@ -49,6 +49,8 @@ class SystemVariable(BaseModel): query: str | None = None conversation_id: str | None = None dialogue_count: int | None = None + + _dialogue_count_resolver: Callable[[], int] | None = PrivateAttr(default=None) document_id: str | None = None original_document_id: str | None = None dataset_id: str | None = None