WIP: feat(api): hitl debugging

This commit is contained in:
QuantumGhost
2025-12-26 10:19:26 +08:00
parent 5d0dd329f2
commit 513048c397
2 changed files with 352 additions and 1 deletions

View File

@ -14,14 +14,18 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.file import File
from core.repositories import DifyCoreRepositoryFactory
from core.variables import Variable
from core.variables.consts import SELECTORS_LENGTH
from core.variables.variables import VariableUnion
from core.workflow.entities import WorkflowNodeExecution
from core.workflow.entities import GraphInitParams, WorkflowNodeExecution
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunSucceededEvent
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes import NodeType
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.human_input.entities import _OUTPUT_VARIABLE_PATTERN, HumanInputNodeData
from core.workflow.nodes.human_input.human_input_node import HumanInputNode
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.start.entities import StartNodeData
from core.workflow.runtime import VariablePool
@ -744,6 +748,213 @@ class WorkflowService:
return workflow_node_execution
def get_human_input_form_preview(
self,
*,
app_model: App,
account: Account,
node_id: str,
manual_inputs: Mapping[str, Any] | None = None,
) -> Mapping[str, Any]:
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
if node_type is not NodeType.HUMAN_INPUT:
raise ValueError("Node type must be human-input.")
variable_pool = self._build_human_input_variable_pool(
app_model=app_model,
workflow=draft_workflow,
node_config=node_config,
manual_inputs=manual_inputs or {},
)
node = self._build_human_input_node(
workflow=draft_workflow,
account=account,
node_config=node_config,
variable_pool=variable_pool,
)
rendered_content = node._render_form_content()
resolved_placeholder_values = node._resolve_inputs()
node_data = cast(HumanInputNodeData, node.get_base_node_data())
human_input_required = HumanInputRequired(
form_id=node_id,
form_content=rendered_content,
inputs=node_data.inputs,
actions=node_data.user_actions,
node_id=node_id,
node_title=node.title,
resolved_placeholder_values=resolved_placeholder_values,
web_app_form_token=None,
)
return human_input_required.model_dump(mode="json")
def submit_human_input_form_preview(
self,
*,
app_model: App,
account: Account,
node_id: str,
form_inputs: Mapping[str, Any],
action: str,
) -> Mapping[str, Any]:
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
if node_type is not NodeType.HUMAN_INPUT:
raise ValueError("Node type must be human-input.")
variable_pool = self._build_human_input_variable_pool(
app_model=app_model,
workflow=draft_workflow,
node_config=node_config,
manual_inputs={},
)
node = self._build_human_input_node(
workflow=draft_workflow,
account=account,
node_config=node_config,
variable_pool=variable_pool,
)
node_data = cast(HumanInputNodeData, node.get_base_node_data())
available_actions = {user_action.id for user_action in node_data.user_actions}
if action not in available_actions:
raise ValueError(f"Invalid action: {action}")
expected_inputs = {form_input.output_variable_name for form_input in node_data.inputs}
missing_inputs = [name for name in expected_inputs if name not in form_inputs]
if missing_inputs:
missing_list = ", ".join(sorted(missing_inputs))
raise ValueError(f"Missing inputs: {missing_list}")
rendered_content = node._render_form_content()
filled_inputs = dict(form_inputs)
rendered_content_with_outputs = self._render_content_with_output_values(rendered_content, filled_inputs)
outputs: dict[str, Any] = dict(filled_inputs)
outputs["__action_id"] = action
outputs["__rendered_content"] = rendered_content_with_outputs
enclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config)
enclosing_node_id = enclosing_node_type_and_id[1] if enclosing_node_type_and_id else None
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
app_id=app_model.id,
node_id=node_id,
node_type=NodeType.HUMAN_INPUT,
node_execution_id=str(uuid.uuid4()),
user=account,
enclosing_node_id=enclosing_node_id,
)
draft_var_saver.save(outputs=outputs, process_data={})
session.commit()
return outputs
def _build_human_input_node(
self,
*,
workflow: Workflow,
account: Account,
node_config: Mapping[str, Any],
variable_pool: VariablePool,
) -> HumanInputNode:
graph_init_params = GraphInitParams(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
graph_config=workflow.graph_dict,
user_id=account.id,
user_from=UserFrom.ACCOUNT.value,
invoke_from=InvokeFrom.DEBUGGER.value,
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(
variable_pool=variable_pool,
start_at=time.perf_counter(),
)
node = HumanInputNode(
id=node_config.get("id", str(uuid.uuid4())),
config=node_config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
node.init_node_data(node_config.get("data", {}))
return node
def _build_human_input_variable_pool(
self,
*,
app_model: App,
workflow: Workflow,
node_config: Mapping[str, Any],
manual_inputs: Mapping[str, Any],
) -> VariablePool:
with Session(bind=db.engine, expire_on_commit=False) as session, session.begin():
draft_var_srv = WorkflowDraftVariableService(session)
draft_var_srv.prefill_conversation_variable_default_values(workflow)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
environment_variables=workflow.environment_variables,
conversation_variables=[],
)
variable_loader = DraftVarLoader(
engine=db.engine,
app_id=app_model.id,
tenant_id=app_model.tenant_id,
)
variable_mapping = HumanInputNode.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict,
config=node_config,
)
selectors_to_load: list[list[str]] = []
for selector in variable_mapping.values():
if variable_pool.get(selector) is None:
selectors_to_load.append(list(selector))
loaded_variables = variable_loader.load_variables(selectors_to_load)
for variable in loaded_variables:
variable_pool.add([variable.selector[0], variable.selector[1]], variable)
for raw_key, value in manual_inputs.items():
selector = self._parse_selector(raw_key)
variable_pool.add(selector, value)
return variable_pool
def _parse_selector(self, selector_key: str) -> list[str]:
cleaned = selector_key.strip()
if cleaned.startswith("#") and cleaned.endswith("#"):
cleaned = cleaned[1:-1]
selector = cleaned.split(".")
if len(selector) != SELECTORS_LENGTH:
raise ValueError(f"Invalid selector '{selector_key}', expected format '<node_id>.<variable_name>'.")
return selector
def _render_content_with_output_values(self, content: str, outputs: Mapping[str, Any]) -> str:
def _replace(match):
field_name = match.group("field_name")
value = outputs.get(field_name)
if value is None:
return ""
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value)
return _OUTPUT_VARIABLE_PATTERN.sub(_replace, content)
def run_free_workflow_node(
self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
) -> WorkflowNodeExecution: