Merge remote-tracking branch 'origin/main' into feat/support-agent-sandbox

# Conflicts:
#	api/app.py
#	api/controllers/console/app/generator.py
#	api/core/llm_generator/llm_generator.py
#	web/eslint-suppressions.json
#	web/pnpm-lock.yaml
#	web/tailwind-common-config.ts
This commit is contained in:
yyh
2026-01-30 20:08:35 +08:00
41 changed files with 1479 additions and 1197 deletions

View File

@ -0,0 +1,20 @@
"""Shared payload models for LLM generator helpers and controllers."""
from pydantic import BaseModel, Field
from core.app.app_config.entities import ModelConfig
class RuleGeneratePayload(BaseModel):
instruction: str = Field(..., description="Rule generation instruction")
model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration")
no_variable: bool = Field(default=False, description="Whether to exclude variables")
class RuleCodeGeneratePayload(RuleGeneratePayload):
code_language: str = Field(default="javascript", description="Programming language for code generation")
class RuleStructuredOutputPayload(BaseModel):
instruction: str = Field(..., description="Structured output generation instruction")
model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration")

View File

@ -6,11 +6,13 @@ from typing import Protocol
import json_repair
from core.app.app_config.entities import ModelConfig
from core.llm_generator.context_models import (
AvailableVarPayload,
CodeContextPayload,
ParameterInfoPayload,
)
from core.llm_generator.entities import RuleCodeGeneratePayload, RuleGeneratePayload, RuleStructuredOutputPayload
from core.llm_generator.output_models import (
CodeNodeStructuredOutput,
InstructionModifyOutput,
@ -158,19 +160,19 @@ class LLMGenerator:
return questions
@classmethod
def generate_rule_config(cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool):
def generate_rule_config(cls, tenant_id: str, args: RuleGeneratePayload):
output_parser = RuleConfigGeneratorOutputParser()
error = ""
error_step = ""
rule_config = {"prompt": "", "variables": [], "opening_statement": "", "error": ""}
model_parameters = model_config.get("completion_params", {})
if no_variable:
model_parameters = args.model_config_data.completion_params
if args.no_variable:
prompt_template = PromptTemplateParser(WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE)
prompt_generate = prompt_template.format(
inputs={
"TASK_DESCRIPTION": instruction,
"TASK_DESCRIPTION": args.instruction,
},
remove_template_variables=False,
)
@ -182,8 +184,8 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
provider=args.model_config_data.provider,
model=args.model_config_data.name,
)
try:
@ -197,7 +199,7 @@ class LLMGenerator:
error = str(e)
error_step = "generate rule config"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
logger.exception("Failed to generate rule config, model: %s", args.model_config_data.name)
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -216,7 +218,7 @@ class LLMGenerator:
# format the prompt_generate_prompt
prompt_generate_prompt = prompt_template.format(
inputs={
"TASK_DESCRIPTION": instruction,
"TASK_DESCRIPTION": args.instruction,
},
remove_template_variables=False,
)
@ -227,8 +229,8 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
provider=args.model_config_data.provider,
model=args.model_config_data.name,
)
try:
@ -257,7 +259,7 @@ class LLMGenerator:
# the second step to generate the task_parameter and task_statement
statement_generate_prompt = statement_template.format(
inputs={
"TASK_DESCRIPTION": instruction,
"TASK_DESCRIPTION": args.instruction,
"INPUT_TEXT": prompt_content.message.get_text_content(),
},
remove_template_variables=False,
@ -283,7 +285,7 @@ class LLMGenerator:
error_step = "generate conversation opener"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
logger.exception("Failed to generate rule config, model: %s", args.model_config_data.name)
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -291,16 +293,20 @@ class LLMGenerator:
return rule_config
@classmethod
def generate_code(cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"):
if code_language == "python":
def generate_code(
cls,
tenant_id: str,
args: RuleCodeGeneratePayload,
):
if args.code_language == "python":
prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
else:
prompt_template = PromptTemplateParser(JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE)
prompt = prompt_template.format(
inputs={
"INSTRUCTION": instruction,
"CODE_LANGUAGE": code_language,
"INSTRUCTION": args.instruction,
"CODE_LANGUAGE": args.code_language,
},
remove_template_variables=False,
)
@ -309,28 +315,28 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
provider=args.model_config_data.provider,
model=args.model_config_data.name,
)
prompt_messages = [UserPromptMessage(content=prompt)]
model_parameters = model_config.get("completion_params", {})
model_parameters = args.model_config_data.completion_params
try:
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
generated_code = response.message.get_text_content()
return {"code": generated_code, "language": code_language, "error": ""}
return {"code": generated_code, "language": args.code_language, "error": ""}
except InvokeError as e:
error = str(e)
return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
return {"code": "", "language": args.code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
"Failed to invoke LLM model, model: %s, language: %s", args.model_config_data.name, args.code_language
)
return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
return {"code": "", "language": args.code_language, "error": f"An unexpected error occurred: {str(e)}"}
@classmethod
def generate_qa_document(cls, tenant_id: str, query, document_language: str):
@ -360,20 +366,20 @@ class LLMGenerator:
return answer.strip()
@classmethod
def generate_structured_output(cls, tenant_id: str, instruction: str, model_config: dict):
def generate_structured_output(cls, tenant_id: str, args: RuleStructuredOutputPayload):
model_manager = ModelManager()
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
provider=args.model_config_data.provider,
model=args.model_config_data.name,
)
prompt_messages = [
SystemPromptMessage(content=SYSTEM_STRUCTURED_OUTPUT_GENERATE),
UserPromptMessage(content=instruction),
UserPromptMessage(content=args.instruction),
]
model_parameters = model_config.get("model_parameters", {})
model_parameters = args.model_config_data.completion_params
try:
response: LLMResult = model_instance.invoke_llm(
@ -397,7 +403,7 @@ class LLMGenerator:
error = str(e)
return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e:
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
logger.exception("Failed to invoke LLM model, model: %s", args.model_config_data.name)
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
@classmethod
@ -670,7 +676,12 @@ Generate {language} code to extract/transform available variables for the target
@staticmethod
def instruction_modify_legacy(
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
tenant_id: str,
flow_id: str,
current: str,
instruction: str,
model_config: ModelConfig,
ideal_output: str | None,
):
last_run: Message | None = (
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
@ -709,7 +720,7 @@ Generate {language} code to extract/transform available variables for the target
node_id: str,
current: str,
instruction: str,
model_config: dict,
model_config: ModelConfig,
ideal_output: str | None,
workflow_service: WorkflowServiceInterface,
):
@ -782,7 +793,7 @@ Generate {language} code to extract/transform available variables for the target
@staticmethod
def __instruction_modify_common(
tenant_id: str,
model_config: dict,
model_config: ModelConfig,
last_run: dict | None,
current: str | None,
error_message: str | None,
@ -803,8 +814,8 @@ Generate {language} code to extract/transform available variables for the target
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
provider=model_config.provider,
model=model_config.name,
)
model_name = model_config.get("name", "")
model_schema = model_instance.model_type_instance.get_model_schema(model_name, model_instance.credentials)
@ -845,7 +856,5 @@ Generate {language} code to extract/transform available variables for the target
error = str(e)
return {"error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
)
logger.exception("Failed to invoke LLM model, model: %s", json.dumps(model_config.name), exc_info=True)
return {"error": f"An unexpected error occurred: {str(e)}"}

View File

@ -347,7 +347,7 @@ class BaseSession(
message.message.root.model_dump(by_alias=True, mode="json", exclude_none=True)
)
responder = RequestResponder(
responder = RequestResponder[ReceiveRequestT, SendResultT](
request_id=message.message.root.id,
request_meta=validated_request.root.params.meta if validated_request.root.params else None,
request=validated_request,

View File

@ -92,6 +92,10 @@ def _build_llm_result_from_first_chunk(
Build a single `LLMResult` from the first returned chunk.
This is used for `stream=False` because the plugin side may still implement the response via a chunked stream.
Note:
This function always drains the `chunks` iterator after reading the first chunk to ensure any underlying
streaming resources are released (e.g., HTTP connections owned by the plugin runtime).
"""
content = ""
content_list: list[PromptMessageContentUnionTypes] = []
@ -99,18 +103,25 @@ def _build_llm_result_from_first_chunk(
system_fingerprint: str | None = None
tools_calls: list[AssistantPromptMessage.ToolCall] = []
first_chunk = next(chunks, None)
if first_chunk is not None:
if isinstance(first_chunk.delta.message.content, str):
content += first_chunk.delta.message.content
elif isinstance(first_chunk.delta.message.content, list):
content_list.extend(first_chunk.delta.message.content)
try:
first_chunk = next(chunks, None)
if first_chunk is not None:
if isinstance(first_chunk.delta.message.content, str):
content += first_chunk.delta.message.content
elif isinstance(first_chunk.delta.message.content, list):
content_list.extend(first_chunk.delta.message.content)
if first_chunk.delta.message.tool_calls:
_increase_tool_call(first_chunk.delta.message.tool_calls, tools_calls)
if first_chunk.delta.message.tool_calls:
_increase_tool_call(first_chunk.delta.message.tool_calls, tools_calls)
usage = first_chunk.delta.usage or LLMUsage.empty_usage()
system_fingerprint = first_chunk.system_fingerprint
usage = first_chunk.delta.usage or LLMUsage.empty_usage()
system_fingerprint = first_chunk.system_fingerprint
finally:
try:
for _ in chunks:
pass
except Exception:
logger.debug("Failed to drain non-stream plugin chunk iterator.", exc_info=True)
return LLMResult(
model=model,
@ -283,7 +294,7 @@ class LargeLanguageModel(AIModel):
# TODO
raise self._transform_invoke_error(e)
if stream and isinstance(result, Generator):
if stream and not isinstance(result, LLMResult):
return self._invoke_result_generator(
model=model,
result=result,

View File

@ -314,6 +314,8 @@ class ModelProviderFactory:
elif model_type == ModelType.TTS:
return TTSModel.model_validate(init_params)
raise ValueError(f"Unsupported model type: {model_type}")
def get_provider_icon(self, provider: str, icon_type: str, lang: str) -> tuple[bytes, str]:
"""
Get provider icon

View File

@ -23,8 +23,8 @@ class TriggerDebugEventBus:
"""
# LUA_SELECT: Atomic poll or register for event
# KEYS[1] = trigger_debug_inbox:{tenant_id}:{address_id}
# KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:...
# KEYS[1] = trigger_debug_inbox:{<tenant_id>}:<address_id>
# KEYS[2] = trigger_debug_waiting_pool:{<tenant_id>}:...
# ARGV[1] = address_id
LUA_SELECT = (
"local v=redis.call('GET',KEYS[1]);"
@ -35,7 +35,7 @@ class TriggerDebugEventBus:
)
# LUA_DISPATCH: Dispatch event to all waiting addresses
# KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:...
# KEYS[1] = trigger_debug_waiting_pool:{<tenant_id>}:...
# ARGV[1] = tenant_id
# ARGV[2] = event_json
LUA_DISPATCH = (
@ -43,7 +43,7 @@ class TriggerDebugEventBus:
"if #a==0 then return 0 end;"
"redis.call('DEL',KEYS[1]);"
"for i=1,#a do "
f"redis.call('SET','trigger_debug_inbox:'..ARGV[1]..':'..a[i],ARGV[2],'EX',{TRIGGER_DEBUG_EVENT_TTL});"
f"redis.call('SET','trigger_debug_inbox:{{'..ARGV[1]..'}}'..':'..a[i],ARGV[2],'EX',{TRIGGER_DEBUG_EVENT_TTL});"
"end;"
"return #a"
)
@ -108,7 +108,7 @@ class TriggerDebugEventBus:
Event object if available, None otherwise
"""
address_id: str = hashlib.sha256(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
address: str = f"trigger_debug_inbox:{tenant_id}:{address_id}"
address: str = f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
try:
event_data = redis_client.eval(

View File

@ -42,7 +42,7 @@ def build_webhook_pool_key(tenant_id: str, app_id: str, node_id: str) -> str:
app_id: App ID
node_id: Node ID
"""
return f"{TriggerDebugPoolKey.WEBHOOK}:{tenant_id}:{app_id}:{node_id}"
return f"{TriggerDebugPoolKey.WEBHOOK}:{{{tenant_id}}}:{app_id}:{node_id}"
class PluginTriggerDebugEvent(BaseDebugEvent):
@ -64,4 +64,4 @@ def build_plugin_pool_key(tenant_id: str, provider_id: str, subscription_id: str
provider_id: Provider ID
subscription_id: Subscription ID
"""
return f"{TriggerDebugPoolKey.PLUGIN}:{tenant_id}:{str(provider_id)}:{subscription_id}:{name}"
return f"{TriggerDebugPoolKey.PLUGIN}:{{{tenant_id}}}:{str(provider_id)}:{subscription_id}:{name}"