mirror of
https://github.com/langgenius/dify.git
synced 2026-04-26 13:45:57 +08:00
fix: tighten repeated trace matching
This commit is contained in:
@ -33,6 +33,7 @@ import {
|
||||
import { useToastContext } from '@/app/components/base/toast/context'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import useTimestamp from '@/hooks/use-timestamp'
|
||||
import { useParams, usePathname } from '@/next/navigation'
|
||||
import {
|
||||
@ -60,30 +61,10 @@ const findParallelTraceIndex = (
|
||||
tracing: ParallelTraceLike[],
|
||||
data: Partial<ParallelTraceLike>,
|
||||
) => {
|
||||
const incomingParallelId = data.execution_metadata?.parallel_id ?? data.parallel_id
|
||||
|
||||
if (data.id) {
|
||||
const matchedByIdIndex = tracing.findIndex((item) => {
|
||||
if (item.id !== data.id)
|
||||
return false
|
||||
|
||||
const existingParallelId = item.execution_metadata?.parallel_id ?? item.parallel_id
|
||||
if (!existingParallelId || !incomingParallelId)
|
||||
return true
|
||||
|
||||
return existingParallelId === incomingParallelId
|
||||
})
|
||||
|
||||
if (matchedByIdIndex > -1)
|
||||
return matchedByIdIndex
|
||||
}
|
||||
|
||||
return tracing.findIndex((item) => {
|
||||
if (item.node_id !== data.node_id)
|
||||
return false
|
||||
|
||||
const existingParallelId = item.execution_metadata?.parallel_id ?? item.parallel_id
|
||||
return existingParallelId === incomingParallelId
|
||||
return findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId: data.execution_metadata?.parallel_id ?? data.parallel_id,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -6,6 +6,7 @@ import { produce } from 'immer'
|
||||
import { getFilesInLogs } from '@/app/components/base/file-uploader/utils'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import { sseGet } from '@/service/base'
|
||||
|
||||
type Notify = (payload: { type: 'error' | 'warning', message: string }) => void
|
||||
@ -51,12 +52,17 @@ const matchParallelTrace = (trace: WorkflowProcess['tracing'][number], data: Nod
|
||||
}
|
||||
|
||||
const findParallelTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||
return tracing.findIndex((trace) => {
|
||||
if (trace.id && data.id)
|
||||
return trace.id === data.id
|
||||
|
||||
return matchParallelTrace(trace, data)
|
||||
const parallelId = data.execution_metadata?.parallel_id
|
||||
const matchedIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId,
|
||||
})
|
||||
|
||||
if (matchedIndex > -1)
|
||||
return matchedIndex
|
||||
|
||||
return tracing.findIndex(trace => matchParallelTrace(trace, data))
|
||||
}
|
||||
|
||||
const ensureParallelTraceDetails = (details?: NodeTracing['details']) => {
|
||||
@ -120,21 +126,12 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
})
|
||||
}
|
||||
|
||||
const findWorkflowNodeTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||
return tracing.findIndex((trace) => {
|
||||
if (trace.id && data.id)
|
||||
return trace.id === data.id
|
||||
|
||||
return matchParallelTrace(trace, data)
|
||||
})
|
||||
}
|
||||
|
||||
const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||
if (data.iteration_id || data.loop_id)
|
||||
return current
|
||||
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
const currentIndex = findWorkflowNodeTraceIndex(draft.tracing, data)
|
||||
const currentIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
if (currentIndex > -1) {
|
||||
draft.tracing[currentIndex] = {
|
||||
...(draft.tracing[currentIndex].extras
|
||||
|
||||
@ -474,9 +474,7 @@ export const useChat = (
|
||||
onIterationFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -500,9 +498,7 @@ export const useChat = (
|
||||
onLoopFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -544,9 +540,7 @@ export const useChat = (
|
||||
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -831,9 +825,7 @@ export const useChat = (
|
||||
parallelId: nodeFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData),
|
||||
} as any
|
||||
responseItem.workflowProcess.tracing[currentIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData) as any
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user