feat(api): implement zero-migration transparent upgrade (Phase 8)

Add two feature-flag-controlled upgrade paths that allow existing apps
and LLM nodes to transparently run through the Agent V2 engine without
any database migration:

1. AGENT_V2_TRANSPARENT_UPGRADE (default: off):
   When enabled, old apps (chat/completion/agent-chat) bypass legacy
   Easy-UI runners. VirtualWorkflowSynthesizer converts AppModelConfig
   to an in-memory Workflow (start -> agent-v2 -> answer) at runtime,
   then executes via AdvancedChatAppGenerator. Falls back to legacy
   path on any synthesis error.

   VirtualWorkflowSynthesizer maps:
   - model JSON -> ModelConfig
   - pre_prompt/chat_prompt_config -> prompt_template
   - agent_mode.tools -> ToolMetadata[]
   - agent_mode.strategy -> agent_strategy
   - dataset_configs -> context
   - file_upload -> vision

2. AGENT_V2_REPLACES_LLM (default: off):
   When enabled, DifyNodeFactory.create_node() transparently remaps
   nodes with type="llm" to type="agent-v2" before class resolution.
   Since AgentV2NodeData is a strict superset of LLMNodeData, the
   mapping is lossless. With tools=[], Agent V2 behaves identically
   to LLM Node.

Both flags default to False for safety. Turn off = instant rollback.
46 existing tests pass. Flask starts successfully.

Made-with: Cursor
This commit is contained in:
Yansong Zhang
2026-04-09 10:30:52 +08:00
parent 96374d7f6a
commit 66212e3575
4 changed files with 350 additions and 0 deletions

View File

@ -1322,6 +1322,22 @@ class CollaborationConfig(BaseSettings):
)
class AgentV2UpgradeConfig(BaseSettings):
"""Feature flags for transparent Agent V2 upgrade."""
AGENT_V2_TRANSPARENT_UPGRADE: bool = Field(
description="Transparently run old apps (chat/completion/agent-chat) through the Agent V2 workflow engine. "
"When enabled, old apps synthesize a virtual workflow at runtime instead of using legacy runners.",
default=False,
)
AGENT_V2_REPLACES_LLM: bool = Field(
description="Transparently replace LLM nodes in workflows with Agent V2 nodes at runtime. "
"LLMNodeData is remapped to AgentV2NodeData with tools=[] (identical behavior).",
default=False,
)
class LoginConfig(BaseSettings):
ENABLE_EMAIL_CODE_LOGIN: bool = Field(
description="whether to enable email code login",
@ -1450,6 +1466,7 @@ class FeatureConfig(
WorkflowNodeExecutionConfig,
WorkspaceConfig,
CollaborationConfig,
AgentV2UpgradeConfig,
LoginConfig,
AccountConfig,
SwaggerUIConfig,

View File

@ -331,6 +331,11 @@ class DifyNodeFactory(NodeFactory):
typed_node_config = NodeConfigDictAdapter.validate_python(normalize_node_config_for_graph(node_config))
node_id = typed_node_config["id"]
node_data = typed_node_config["data"]
if node_data.type == BuiltinNodeTypes.LLM and dify_config.AGENT_V2_REPLACES_LLM:
node_data = self._remap_llm_to_agent_v2(node_data)
typed_node_config["data"] = node_data
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
node_type = node_data.type
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
@ -419,6 +424,23 @@ class DifyNodeFactory(NodeFactory):
"""Resolve sandbox from run_context, if available."""
return self.graph_init_params.run_context.get(DIFY_SANDBOX_CONTEXT_KEY)
@staticmethod
def _remap_llm_to_agent_v2(node_data: BaseNodeData) -> BaseNodeData:
"""Transparently remap LLMNodeData to AgentV2NodeData.
Since AgentV2NodeData is a strict superset of LLMNodeData
(same LLM fields + tools/iterations/strategy), the mapping is lossless.
With tools=[], Agent V2 behaves identically to LLM Node.
"""
from core.workflow.nodes.agent_v2.entities import AGENT_V2_NODE_TYPE, AgentV2NodeData
data_dict = node_data.model_dump()
data_dict["type"] = AGENT_V2_NODE_TYPE
data_dict.setdefault("tools", [])
data_dict.setdefault("max_iterations", 10)
data_dict.setdefault("agent_strategy", "auto")
return AgentV2NodeData.model_validate(data_dict)
@staticmethod
def _validate_resolved_node_data(node_class: type[Node], node_data: BaseNodeData) -> BaseNodeData:
"""

View File

@ -125,6 +125,77 @@ class AppGenerateService:
if app_model.is_agent and app_model.mode not in {AppMode.AGENT_CHAT, AppMode.AGENT}
else app_model.mode
)
if (
effective_mode in {AppMode.COMPLETION, AppMode.CHAT, AppMode.AGENT_CHAT}
and dify_config.AGENT_V2_TRANSPARENT_UPGRADE
):
from services.workflow.virtual_workflow import VirtualWorkflowSynthesizer
try:
virtual_workflow = VirtualWorkflowSynthesizer.synthesize(app_model)
logger.info(
"[AGENT_V2_UPGRADE] Transparent upgrade for app %s (mode=%s)",
app_model.id,
effective_mode,
)
workflow_id_arg = args.get("workflow_id")
if not workflow_id_arg:
workflow = virtual_workflow
else:
workflow = cls._get_workflow(app_model, invoke_from, workflow_id_arg)
if streaming:
with rate_limit_context(rate_limit, request_id):
payload = AppExecutionParams.new(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=True,
call_depth=0,
)
payload_json = payload.model_dump_json()
def on_subscribe():
workflow_based_app_execution_task.delay(payload_json)
on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe)
generator = AdvancedChatAppGenerator()
return rate_limit.generate(
generator.convert_to_event_stream(
generator.retrieve_events(
AppMode.AGENT,
payload.workflow_run_id,
on_subscribe=on_subscribe,
),
),
request_id=request_id,
)
else:
advanced_generator = AdvancedChatAppGenerator()
return rate_limit.generate(
advanced_generator.convert_to_event_stream(
advanced_generator.generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
workflow_run_id=str(uuid.uuid4()),
streaming=False,
)
),
request_id=request_id,
)
except Exception:
logger.warning(
"[AGENT_V2_UPGRADE] Transparent upgrade failed for app %s, falling back to legacy",
app_model.id,
exc_info=True,
)
match effective_mode:
case AppMode.COMPLETION:
return rate_limit.generate(

View File

@ -0,0 +1,240 @@
"""Virtual Workflow Synthesizer for transparent old-app upgrade.
Converts an old App's AppModelConfig into an in-memory Workflow object
with a single agent-v2 node, without persisting to the database.
This allows legacy apps (chat/completion/agent-chat) to run through
the Agent V2 workflow engine transparently.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from uuid import uuid4
from core.workflow.nodes.agent_v2.entities import AGENT_V2_NODE_TYPE
from models.model import App, AppMode, AppModelConfig
logger = logging.getLogger(__name__)
class VirtualWorkflowSynthesizer:
"""Synthesize in-memory Workflow from legacy AppModelConfig."""
@staticmethod
def synthesize(app: App) -> Any:
"""Convert old app config to a virtual Workflow object.
Returns a Workflow-like object (not persisted to DB) that can be
passed to AdvancedChatAppGenerator.generate().
"""
from models.workflow import Workflow, WorkflowType
config = app.app_model_config
if not config:
raise ValueError("App has no model config")
model_dict = _extract_model_config(config)
prompt_template = _build_prompt_template(config, app.mode)
tools = _extract_tools(config)
agent_strategy = _extract_strategy(config)
max_iterations = _extract_max_iterations(config)
context = _build_context_config(config)
vision = _build_vision_config(config)
is_chat = app.mode != AppMode.COMPLETION
agent_node_data: dict[str, Any] = {
"type": AGENT_V2_NODE_TYPE,
"title": "Agent",
"model": model_dict,
"prompt_template": prompt_template,
"tools": tools,
"max_iterations": max_iterations,
"agent_strategy": agent_strategy,
"context": context,
"vision": vision,
}
if is_chat:
agent_node_data["memory"] = {"window": {"enabled": True, "size": 50}}
graph = _build_graph(agent_node_data, is_chat)
workflow = Workflow()
workflow.id = str(uuid4())
workflow.tenant_id = app.tenant_id
workflow.app_id = app.id
workflow.type = WorkflowType.CHAT if is_chat else WorkflowType.WORKFLOW
workflow.version = "virtual"
workflow.graph = json.dumps(graph)
workflow.features = "{}"
workflow.created_by = app.created_by
workflow.updated_by = app.updated_by
return workflow
def _extract_model_config(config: AppModelConfig) -> dict[str, Any]:
if config.model:
try:
return json.loads(config.model)
except (json.JSONDecodeError, TypeError):
pass
return {"provider": "openai", "name": "gpt-4o", "mode": "chat", "completion_params": {}}
def _build_prompt_template(config: AppModelConfig, mode: str) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
if config.prompt_type and config.prompt_type.value == "advanced":
if config.chat_prompt_config:
try:
chat_config = json.loads(config.chat_prompt_config)
if isinstance(chat_config, dict) and "prompt" in chat_config:
prompts = chat_config["prompt"]
if isinstance(prompts, list):
for p in prompts:
if isinstance(p, dict) and "role" in p and "text" in p:
messages.append({"role": p["role"], "text": p["text"]})
except (json.JSONDecodeError, TypeError):
pass
if not messages:
pre_prompt = config.pre_prompt or ""
if pre_prompt:
messages.append({"role": "system", "text": pre_prompt})
if mode == AppMode.COMPLETION:
messages.append({"role": "user", "text": "{{#sys.query#}}"})
else:
messages.append({"role": "user", "text": "{{#sys.query#}}"})
return messages
def _extract_tools(config: AppModelConfig) -> list[dict[str, Any]]:
if not config.agent_mode:
return []
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return []
if not isinstance(agent_mode, dict) or not agent_mode.get("enabled"):
return []
tools_config = agent_mode.get("tools", [])
result: list[dict[str, Any]] = []
for tool in tools_config:
if not isinstance(tool, dict):
continue
if not tool.get("enabled", True):
continue
provider_type = tool.get("provider_type", "builtin")
provider_id = tool.get("provider_id", "")
tool_name = tool.get("tool_name", "")
if not tool_name:
continue
result.append({
"enabled": True,
"type": provider_type,
"provider_name": provider_id,
"tool_name": tool_name,
"parameters": tool.get("tool_parameters", {}),
"settings": {},
})
return result
def _extract_strategy(config: AppModelConfig) -> str:
if not config.agent_mode:
return "auto"
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return "auto"
strategy = agent_mode.get("strategy", "")
mapping = {
"function_call": "function-calling",
"react": "chain-of-thought",
}
return mapping.get(strategy, "auto")
def _extract_max_iterations(config: AppModelConfig) -> int:
if not config.agent_mode:
return 10
try:
agent_mode = json.loads(config.agent_mode) if isinstance(config.agent_mode, str) else config.agent_mode
except (json.JSONDecodeError, TypeError):
return 10
return agent_mode.get("max_iteration", 10)
def _build_context_config(config: AppModelConfig) -> dict[str, Any]:
if config.dataset_configs:
try:
dc = json.loads(config.dataset_configs) if isinstance(config.dataset_configs, str) else config.dataset_configs
if isinstance(dc, dict) and dc.get("datasets", {}).get("datasets", []):
return {"enabled": True}
except (json.JSONDecodeError, TypeError):
pass
return {"enabled": False}
def _build_vision_config(config: AppModelConfig) -> dict[str, Any]:
if config.file_upload:
try:
fu = json.loads(config.file_upload) if isinstance(config.file_upload, str) else config.file_upload
if isinstance(fu, dict) and fu.get("image", {}).get("enabled"):
return {"enabled": True}
except (json.JSONDecodeError, TypeError):
pass
return {"enabled": False}
def _build_graph(agent_data: dict[str, Any], is_chat: bool) -> dict[str, Any]:
nodes: list[dict[str, Any]] = [
{
"id": "start",
"type": "custom",
"data": {"type": "start", "title": "Start", "variables": []},
"position": {"x": 80, "y": 282},
},
{
"id": "agent",
"type": "custom",
"data": agent_data,
"position": {"x": 400, "y": 282},
},
]
if is_chat:
nodes.append({
"id": "answer",
"type": "custom",
"data": {"type": "answer", "title": "Answer", "answer": "{{#agent.text#}}"},
"position": {"x": 720, "y": 282},
})
end_id = "answer"
else:
nodes.append({
"id": "end",
"type": "custom",
"data": {"type": "end", "title": "End", "outputs": [{"value_selector": ["agent", "text"], "variable": "result"}]},
"position": {"x": 720, "y": 282},
})
end_id = "end"
edges = [
{"id": "start-agent", "source": "start", "target": "agent", "sourceHandle": "source", "targetHandle": "target"},
{"id": f"agent-{end_id}", "source": "agent", "target": end_id, "sourceHandle": "source", "targetHandle": "target"},
]
return {"nodes": nodes, "edges": edges}