refactor(workflow): update output schema handling in tool and data source nodes for improved integration with plugin info

This commit is contained in:
twwu
2025-08-26 16:48:18 +08:00
parent 1f5fd13359
commit bdcd9ad9cb
19 changed files with 219 additions and 105 deletions

View File

@ -19,7 +19,7 @@ import { OUTPUT_FILE_SUB_VARIABLES } from '../../../constants'
import type { DocExtractorNodeType } from '../../../document-extractor/types'
import { BlockEnum, InputVarType, VarType } from '@/app/components/workflow/types'
import type { StartNodeType } from '@/app/components/workflow/nodes/start/types'
import type { ConversationVariable, EnvironmentVariable, Node, NodeOutPutVar, ValueSelector, Var } from '@/app/components/workflow/types'
import type { ConversationVariable, EnvironmentVariable, Node, NodeOutPutVar, ToolWithProvider, ValueSelector, Var } from '@/app/components/workflow/types'
import type { VariableAssignerNodeType } from '@/app/components/workflow/nodes/variable-assigner/types'
import type { Field as StructField } from '@/app/components/workflow/nodes/llm/types'
import type { RAGPipelineVariable } from '@/models/pipeline'
@ -35,12 +35,12 @@ import {
TEMPLATE_TRANSFORM_OUTPUT_STRUCT,
TOOL_OUTPUT_STRUCT,
} from '@/app/components/workflow/constants'
import ToolNodeDefault from '@/app/components/workflow/nodes/tool/default'
import DataSourceNodeDefault from '@/app/components/workflow/nodes/data-source/default'
import type { DataSourceNodeType } from '@/app/components/workflow/nodes/data-source/types'
import type { PromptItem } from '@/models/debug'
import { VAR_REGEX } from '@/config'
import type { AgentNodeType } from '../../../agent/types'
import { getOutputVariableAlias } from '@/app/components/workflow/utils/tool'
export const isSystemVar = (valueSelector: ValueSelector) => {
return valueSelector[0] === 'sys' || valueSelector[1] === 'sys'
@ -229,6 +229,7 @@ const formatItem = (
item: any,
isChatMode: boolean,
filterVar: (payload: Var, selector: ValueSelector) => boolean,
allPluginInfoList: Record<string, ToolWithProvider[]>,
ragVars?: Var[],
): NodeOutPutVar => {
const { id, data } = item
@ -398,43 +399,8 @@ const formatItem = (
}
case BlockEnum.Tool: {
const {
output_schema,
} = data as ToolNodeType
if (!output_schema) {
res.vars = TOOL_OUTPUT_STRUCT
}
else {
const outputSchema: any[] = []
Object.keys(output_schema.properties).forEach((outputKey) => {
const output = output_schema.properties[outputKey]
const dataType = output.type
const alias = getOutputVariableAlias(output.properties)
let type = dataType === 'array'
? `array[${output.items?.type.slice(0, 1).toLocaleLowerCase()}${output.items?.type.slice(1)}]`
: `${output.type.slice(0, 1).toLocaleLowerCase()}${output.type.slice(1)}`
if (type === VarType.object && alias === 'file')
type = VarType.file
outputSchema.push({
variable: outputKey,
type,
description: output.description,
alias,
children: output.type === 'object' ? {
schema: {
type: 'object',
properties: output.properties,
},
} : undefined,
})
})
res.vars = [
...TOOL_OUTPUT_STRUCT,
...outputSchema,
]
}
const toolOutputVars = ToolNodeDefault.getOutputVars?.(data as ToolNodeType, allPluginInfoList) || []
res.vars = toolOutputVars
break
}
@ -529,7 +495,7 @@ const formatItem = (
case BlockEnum.DataSource: {
const payload = data as DataSourceNodeType
const dataSourceVars = DataSourceNodeDefault.getOutputVars?.(payload, ragVars) || []
const dataSourceVars = DataSourceNodeDefault.getOutputVars?.(payload, allPluginInfoList, ragVars) || []
res.vars = dataSourceVars
break
}
@ -658,6 +624,7 @@ export const toNodeOutputVars = (
environmentVariables: EnvironmentVariable[] = [],
conversationVariables: ConversationVariable[] = [],
ragVariables: RAGPipelineVariable[] = [],
allPluginInfoList: Record<string, ToolWithProvider[]>,
): NodeOutPutVar[] => {
// ENV_NODE data format
const ENV_NODE = {
@ -708,7 +675,7 @@ export const toNodeOutputVars = (
if (node.data.type === BlockEnum.DataSource)
ragVariablesInDataSource = ragVariables.filter(ragVariable => ragVariable.belong_to_node_id === node.id)
return {
...formatItem(node, isChatMode, filterVar, ragVariablesInDataSource.map(
...formatItem(node, isChatMode, filterVar, allPluginInfoList, ragVariablesInDataSource.map(
(ragVariable: RAGPipelineVariable) => ({
variable: `rag.${node.id}.${ragVariable.variable}`,
type: inputVarTypeToVarType(ragVariable.type as any),
@ -835,6 +802,7 @@ export const getVarType = ({
environmentVariables = [],
conversationVariables = [],
ragVariables = [],
allPluginInfoList,
}: {
valueSelector: ValueSelector
parentNode?: Node | null
@ -846,6 +814,7 @@ export const getVarType = ({
environmentVariables?: EnvironmentVariable[]
conversationVariables?: ConversationVariable[]
ragVariables?: RAGPipelineVariable[]
allPluginInfoList: Record<string, ToolWithProvider[]>
}): VarType => {
if (isConstant)
return VarType.string
@ -857,6 +826,7 @@ export const getVarType = ({
environmentVariables,
conversationVariables,
ragVariables,
allPluginInfoList,
)
const isIterationInnerVar = parentNode?.data.type === BlockEnum.Iteration
@ -982,6 +952,7 @@ export const toNodeAvailableVars = ({
conversationVariables,
ragVariables,
filterVar,
allPluginInfoList,
}: {
parentNode?: Node | null
t?: any
@ -995,6 +966,7 @@ export const toNodeAvailableVars = ({
// rag variables
ragVariables?: RAGPipelineVariable[]
filterVar: (payload: Var, selector: ValueSelector) => boolean
allPluginInfoList: Record<string, ToolWithProvider[]>
}): NodeOutPutVar[] => {
const beforeNodesOutputVars = toNodeOutputVars(
beforeNodes,
@ -1003,6 +975,7 @@ export const toNodeAvailableVars = ({
environmentVariables,
conversationVariables,
ragVariables,
allPluginInfoList,
)
const isInIteration = parentNode?.data.type === BlockEnum.Iteration
if (isInIteration) {
@ -1015,6 +988,7 @@ export const toNodeAvailableVars = ({
isChatMode,
environmentVariables,
conversationVariables,
allPluginInfoList,
})
const itemChildren = itemType === VarType.file
? {

View File

@ -128,7 +128,19 @@ const useOneStepRun = <T>({
const availableNodes = getBeforeNodesInSameBranch(id)
const availableNodesIncludeParent = getBeforeNodesInSameBranchIncludeParent(id)
const allOutputVars = toNodeOutputVars(availableNodes, isChatMode, undefined, undefined, conversationVariables)
const buildInTools = useStore(s => s.buildInTools)
const customTools = useStore(s => s.customTools)
const workflowTools = useStore(s => s.workflowTools)
const mcpTools = useStore(s => s.mcpTools)
const dataSourceList = useStore(s => s.dataSourceList)
const allPluginInfoList = {
buildInTools,
customTools,
workflowTools,
mcpTools,
dataSourceList: dataSourceList ?? [],
}
const allOutputVars = toNodeOutputVars(availableNodes, isChatMode, undefined, undefined, conversationVariables, [], allPluginInfoList)
const getVar = (valueSelector: ValueSelector): Var | undefined => {
const isSystem = valueSelector[0] === 'sys'
const targetVar = allOutputVars.find(item => isSystem ? !!item.isStartNode : item.nodeId === valueSelector[0])
@ -188,11 +200,11 @@ const useOneStepRun = <T>({
const isPaused = isPausedRef.current
// The backend don't support pause the single run, so the frontend handle the pause state.
if(isPaused)
if (isPaused)
return
const canRunLastRun = !isRunAfterSingleRun || runningStatus === NodeRunningStatus.Succeeded
if(!canRunLastRun) {
if (!canRunLastRun) {
doSetRunResult(data)
return
}
@ -202,9 +214,9 @@ const useOneStepRun = <T>({
const { getNodes } = store.getState()
const nodes = getNodes()
appendNodeInspectVars(id, vars, nodes)
if(data?.status === NodeRunningStatus.Succeeded) {
if (data?.status === NodeRunningStatus.Succeeded) {
invalidLastRun()
if(isStartNode)
if (isStartNode)
invalidateSysVarValues()
invalidateConversationVarValues() // loop, iteration, variable assigner node can update the conversation variables, but to simple the logic(some nodes may also can update in the future), all nodes refresh.
}
@ -221,21 +233,21 @@ const useOneStepRun = <T>({
})
}
const checkValidWrap = () => {
if(!checkValid)
if (!checkValid)
return { isValid: true, errorMessage: '' }
const res = checkValid(data, t, moreDataForCheckValid)
if(!res.isValid) {
handleNodeDataUpdate({
id,
data: {
...data,
_isSingleRun: false,
},
})
Toast.notify({
type: 'error',
message: res.errorMessage,
})
if (!res.isValid) {
handleNodeDataUpdate({
id,
data: {
...data,
_isSingleRun: false,
},
})
Toast.notify({
type: 'error',
message: res.errorMessage,
})
}
return res
}
@ -254,7 +266,6 @@ const useOneStepRun = <T>({
const { isValid } = checkValidWrap()
setCanShowSingleRun(isValid)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [data._isSingleRun])
useEffect(() => {
@ -296,9 +307,9 @@ const useOneStepRun = <T>({
if (!isIteration && !isLoop) {
const isStartNode = data.type === BlockEnum.Start
const postData: Record<string, any> = {}
if(isStartNode) {
if (isStartNode) {
const { '#sys.query#': query, '#sys.files#': files, ...inputs } = submitData
if(isChatMode)
if (isChatMode)
postData.conversation_id = ''
postData.inputs = inputs
@ -320,7 +331,7 @@ const useOneStepRun = <T>({
{
onWorkflowStarted: noop,
onWorkflowFinished: (params) => {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,
@ -399,7 +410,7 @@ const useOneStepRun = <T>({
setIterationRunResult(newIterationRunResult)
},
onError: () => {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,
@ -423,7 +434,7 @@ const useOneStepRun = <T>({
{
onWorkflowStarted: noop,
onWorkflowFinished: (params) => {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,
@ -503,7 +514,7 @@ const useOneStepRun = <T>({
setLoopRunResult(newLoopRunResult)
},
onError: () => {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,
@ -525,7 +536,7 @@ const useOneStepRun = <T>({
hasError = true
invalidLastRun()
if (!isIteration && !isLoop) {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,
@ -547,11 +558,11 @@ const useOneStepRun = <T>({
})
}
}
if(isPausedRef.current)
if (isPausedRef.current)
return
if (!isIteration && !isLoop && !hasError) {
if(isPausedRef.current)
if (isPausedRef.current)
return
handleNodeDataUpdate({
id,