mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
Compare commits
9 Commits
yanli/fix-
...
review-mys
| Author | SHA1 | Date | |
|---|---|---|---|
| a0af8fb94c | |||
| 179b1efb10 | |||
| d37772f81b | |||
| 0103adc3aa | |||
| f8ad55212e | |||
| d7b0a1679d | |||
| b923090e47 | |||
| e0436bf2db | |||
| 6a164265d6 |
@ -33,6 +33,18 @@ class SortOrder(StrEnum):
|
||||
|
||||
|
||||
class MyScaleVector(BaseVector):
|
||||
_METADATA_KEY_WHITELIST = {
|
||||
"annotation_id",
|
||||
"app_id",
|
||||
"batch",
|
||||
"dataset_id",
|
||||
"doc_hash",
|
||||
"doc_id",
|
||||
"document_id",
|
||||
"lang",
|
||||
"source",
|
||||
}
|
||||
|
||||
def __init__(self, collection_name: str, config: MyScaleConfig, metric: str = "Cosine"):
|
||||
super().__init__(collection_name)
|
||||
self._config = config
|
||||
@ -45,10 +57,17 @@ class MyScaleVector(BaseVector):
|
||||
password=config.password,
|
||||
)
|
||||
self._client.command("SET allow_experimental_object_type=1")
|
||||
self._qualified_table = f"{self._config.database}.{self._collection_name}"
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.MYSCALE
|
||||
|
||||
@classmethod
|
||||
def _validate_metadata_key(cls, key: str) -> str:
|
||||
if key not in cls._METADATA_KEY_WHITELIST:
|
||||
raise ValueError(f"Unsupported metadata key: {key!r}")
|
||||
return key
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
dimension = len(embeddings[0])
|
||||
self._create_collection(dimension)
|
||||
@ -59,7 +78,7 @@ class MyScaleVector(BaseVector):
|
||||
self._client.command(f"CREATE DATABASE IF NOT EXISTS {self._config.database}")
|
||||
fts_params = f"('{self._config.fts_params}')" if self._config.fts_params else ""
|
||||
sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {self._config.database}.{self._collection_name}(
|
||||
CREATE TABLE IF NOT EXISTS {self._qualified_table}(
|
||||
id String,
|
||||
text String,
|
||||
vector Array(Float32),
|
||||
@ -74,73 +93,103 @@ class MyScaleVector(BaseVector):
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
ids = []
|
||||
columns = ["id", "text", "vector", "metadata"]
|
||||
values = []
|
||||
rows = []
|
||||
for i, doc in enumerate(documents):
|
||||
if doc.metadata is not None:
|
||||
doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))
|
||||
row = (
|
||||
doc_id,
|
||||
self.escape_str(doc.page_content),
|
||||
embeddings[i],
|
||||
json.dumps(doc.metadata) if doc.metadata else {},
|
||||
rows.append(
|
||||
(
|
||||
doc_id,
|
||||
doc.page_content,
|
||||
embeddings[i],
|
||||
json.dumps(doc.metadata or {}),
|
||||
)
|
||||
)
|
||||
values.append(str(row))
|
||||
ids.append(doc_id)
|
||||
sql = f"""
|
||||
INSERT INTO {self._config.database}.{self._collection_name}
|
||||
({",".join(columns)}) VALUES {",".join(values)}
|
||||
"""
|
||||
self._client.command(sql)
|
||||
if rows:
|
||||
self._client.insert(self._qualified_table, rows, column_names=columns)
|
||||
return ids
|
||||
|
||||
@staticmethod
|
||||
def escape_str(value: Any) -> str:
|
||||
return "".join(" " if c in {"\\", "'"} else c for c in str(value))
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
results = self._client.query(f"SELECT id FROM {self._config.database}.{self._collection_name} WHERE id='{id}'")
|
||||
results = self._client.query(
|
||||
f"SELECT id FROM {self._qualified_table} WHERE id = %(id)s LIMIT 1",
|
||||
parameters={"id": id},
|
||||
)
|
||||
return results.row_count > 0
|
||||
|
||||
def delete_by_ids(self, ids: list[str]):
|
||||
if not ids:
|
||||
return
|
||||
placeholders, params = self._build_in_params("id", ids)
|
||||
self._client.command(
|
||||
f"DELETE FROM {self._config.database}.{self._collection_name} WHERE id IN {str(tuple(ids))}"
|
||||
f"DELETE FROM {self._qualified_table} WHERE id IN ({placeholders})",
|
||||
parameters=params,
|
||||
)
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
safe_key = self._validate_metadata_key(key)
|
||||
rows = self._client.query(
|
||||
f"SELECT DISTINCT id FROM {self._config.database}.{self._collection_name} WHERE metadata.{key}='{value}'"
|
||||
f"SELECT DISTINCT id FROM {self._qualified_table} WHERE metadata.{safe_key} = %(value)s",
|
||||
parameters={"value": value},
|
||||
).result_rows
|
||||
return [row[0] for row in rows]
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
safe_key = self._validate_metadata_key(key)
|
||||
self._client.command(
|
||||
f"DELETE FROM {self._config.database}.{self._collection_name} WHERE metadata.{key}='{value}'"
|
||||
f"DELETE FROM {self._qualified_table} WHERE metadata.{safe_key} = %(value)s",
|
||||
parameters={"value": value},
|
||||
)
|
||||
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
return self._search(f"distance(vector, {str(query_vector)})", self._vec_order, **kwargs)
|
||||
return self._search(
|
||||
"distance(vector, %(query_vector)s)",
|
||||
self._vec_order,
|
||||
parameters={"query_vector": query_vector},
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
return self._search(f"TextSearch('enable_nlq=false')(text, '{query}')", SortOrder.DESC, **kwargs)
|
||||
return self._search(
|
||||
"TextSearch('enable_nlq=false')(text, %(query)s)",
|
||||
SortOrder.DESC,
|
||||
parameters={"query": query},
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _search(self, dist: str, order: SortOrder, **kwargs: Any) -> list[Document]:
|
||||
@staticmethod
|
||||
def _build_in_params(prefix: str, values: list[str]) -> tuple[str, dict[str, str]]:
|
||||
params: dict[str, str] = {}
|
||||
placeholders = []
|
||||
for i, value in enumerate(values):
|
||||
name = f"{prefix}_{i}"
|
||||
placeholders.append(f"%({name})s")
|
||||
params[name] = value
|
||||
return ", ".join(placeholders), params
|
||||
|
||||
def _search(
|
||||
self,
|
||||
dist: str,
|
||||
order: SortOrder,
|
||||
parameters: dict[str, Any] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> list[Document]:
|
||||
top_k = kwargs.get("top_k", 4)
|
||||
if not isinstance(top_k, int) or top_k <= 0:
|
||||
raise ValueError("top_k must be a positive integer")
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
where_str = (
|
||||
f"WHERE dist < {1 - score_threshold}"
|
||||
if self._metric.upper() == "COSINE" and order == SortOrder.ASC and score_threshold > 0.0
|
||||
else ""
|
||||
)
|
||||
where_clauses = []
|
||||
if self._metric.upper() == "COSINE" and order == SortOrder.ASC and score_threshold > 0.0:
|
||||
where_clauses.append(f"dist < {1 - score_threshold}")
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
query_params = dict(parameters or {})
|
||||
if document_ids_filter:
|
||||
document_ids = ", ".join(f"'{id}'" for id in document_ids_filter)
|
||||
where_str = f"{where_str} AND metadata['document_id'] in ({document_ids})"
|
||||
placeholders, params = self._build_in_params("document_id", document_ids_filter)
|
||||
where_clauses.append(f"metadata['document_id'] IN ({placeholders})")
|
||||
query_params.update(params)
|
||||
where_str = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
sql = f"""
|
||||
SELECT text, vector, metadata, {dist} as dist FROM {self._config.database}.{self._collection_name}
|
||||
SELECT text, vector, metadata, {dist} as dist FROM {self._qualified_table}
|
||||
{where_str} ORDER BY dist {order.value} LIMIT {top_k}
|
||||
"""
|
||||
try:
|
||||
@ -150,14 +199,14 @@ class MyScaleVector(BaseVector):
|
||||
vector=r["vector"],
|
||||
metadata=r["metadata"],
|
||||
)
|
||||
for r in self._client.query(sql).named_results()
|
||||
for r in self._client.query(sql, parameters=query_params).named_results()
|
||||
]
|
||||
except Exception:
|
||||
logger.exception("Vector search operation failed")
|
||||
return []
|
||||
|
||||
def delete(self):
|
||||
self._client.command(f"DROP TABLE IF EXISTS {self._config.database}.{self._collection_name}")
|
||||
self._client.command(f"DROP TABLE IF EXISTS {self._qualified_table}")
|
||||
|
||||
|
||||
class MyScaleVectorFactory(AbstractVectorFactory):
|
||||
|
||||
@ -2,7 +2,7 @@ import importlib
|
||||
import sys
|
||||
import types
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@ -24,6 +24,7 @@ def _build_fake_clickhouse_connect_module():
|
||||
class Client:
|
||||
def __init__(self):
|
||||
self.command = MagicMock()
|
||||
self.insert = MagicMock()
|
||||
self.query = MagicMock(return_value=QueryResult())
|
||||
|
||||
client = Client()
|
||||
@ -58,9 +59,11 @@ def _config(module):
|
||||
)
|
||||
|
||||
|
||||
def test_escape_str_replaces_backslash_and_quote(myscale_module):
|
||||
escaped = myscale_module.MyScaleVector.escape_str(r"text\with'special")
|
||||
assert escaped == "text with special"
|
||||
def test_build_in_params_creates_named_placeholders(myscale_module):
|
||||
placeholders, params = myscale_module.MyScaleVector._build_in_params("document_id", ["doc-1", "doc-2"])
|
||||
|
||||
assert placeholders == "%(document_id_0)s, %(document_id_1)s"
|
||||
assert params == {"document_id_0": "doc-1", "document_id_1": "doc-2"}
|
||||
|
||||
|
||||
def test_search_raises_for_invalid_top_k(myscale_module):
|
||||
@ -172,9 +175,11 @@ def test_add_texts_inserts_rows_and_returns_ids(myscale_module, monkeypatch):
|
||||
ids = vector.add_texts(docs, [[0.1], [0.2], [0.3]])
|
||||
|
||||
assert ids == ["doc-a", "generated-uuid"]
|
||||
sql = vector._client.command.call_args.args[0]
|
||||
assert "INSERT INTO dify.collection_1" in sql
|
||||
assert "te xt 1" in sql
|
||||
vector._client.insert.assert_called_once()
|
||||
insert_table, insert_rows = vector._client.insert.call_args.args[:2]
|
||||
assert insert_table == "dify.collection_1"
|
||||
assert insert_rows[0][1] == r"te'xt\1"
|
||||
assert vector._client.insert.call_args.kwargs["column_names"] == ["id", "text", "vector", "metadata"]
|
||||
|
||||
|
||||
def test_text_exists_and_metadata_operations(myscale_module):
|
||||
@ -198,7 +203,22 @@ def test_search_delegation_methods(myscale_module):
|
||||
|
||||
assert result_vector == ["result"]
|
||||
assert result_text == ["result"]
|
||||
assert vector._search.call_count == 2
|
||||
vector._search.assert_has_calls(
|
||||
[
|
||||
call(
|
||||
"distance(vector, %(query_vector)s)",
|
||||
vector._vec_order,
|
||||
parameters={"query_vector": [0.1, 0.2]},
|
||||
top_k=2,
|
||||
),
|
||||
call(
|
||||
"TextSearch('enable_nlq=false')(text, %(query)s)",
|
||||
myscale_module.SortOrder.DESC,
|
||||
parameters={"query": "hello"},
|
||||
top_k=2,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_search_with_document_filter_and_exception(myscale_module):
|
||||
@ -215,7 +235,10 @@ def test_search_with_document_filter_and_exception(myscale_module):
|
||||
)
|
||||
assert len(docs) == 1
|
||||
sql = vector._client.query.call_args.args[0]
|
||||
assert "metadata['document_id'] in ('doc-1', 'doc-2')" in sql
|
||||
assert "metadata['document_id'] IN (%(document_id_0)s, %(document_id_1)s)" in sql
|
||||
query_params = vector._client.query.call_args.kwargs["parameters"]
|
||||
assert query_params["document_id_0"] == "doc-1"
|
||||
assert query_params["document_id_1"] == "doc-2"
|
||||
|
||||
vector._client.query.side_effect = RuntimeError("boom")
|
||||
assert vector._search("distance(vector, [0.1])", myscale_module.SortOrder.ASC, top_k=1) == []
|
||||
|
||||
@ -588,66 +588,6 @@ describe('useChat', () => {
|
||||
expect(lastResponse.workflowProcess?.status).toBe('failed')
|
||||
})
|
||||
|
||||
it('should keep separate iteration traces for repeated executions of the same iteration node', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const { result } = renderHook(() => useChat())
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'iteration trace test' }, {})
|
||||
})
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||
callbacks.onIterationStart({ data: { id: 'iter-run-1', node_id: 'iter-1' } })
|
||||
callbacks.onIterationStart({ data: { id: 'iter-run-2', node_id: 'iter-1' } })
|
||||
callbacks.onIterationFinish({ data: { id: 'iter-run-1', node_id: 'iter-1', status: 'succeeded' } })
|
||||
callbacks.onIterationFinish({ data: { id: 'iter-run-2', node_id: 'iter-1', status: 'succeeded' } })
|
||||
})
|
||||
|
||||
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing).toEqual(expect.arrayContaining([
|
||||
expect.objectContaining({ id: 'iter-run-1', status: 'succeeded' }),
|
||||
expect.objectContaining({ id: 'iter-run-2', status: 'succeeded' }),
|
||||
]))
|
||||
})
|
||||
|
||||
it('should keep separate top-level traces for repeated executions of the same node', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const { result } = renderHook(() => useChat())
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'top-level trace test' }, {})
|
||||
})
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||
callbacks.onNodeStarted({ data: { id: 'node-run-1', node_id: 'node-1', title: 'Node 1' } })
|
||||
callbacks.onNodeStarted({ data: { id: 'node-run-2', node_id: 'node-1', title: 'Node 1 retry' } })
|
||||
callbacks.onNodeFinished({ data: { id: 'node-run-1', node_id: 'node-1', status: 'succeeded' } })
|
||||
callbacks.onNodeFinished({ data: { id: 'node-run-2', node_id: 'node-1', status: 'succeeded' } })
|
||||
})
|
||||
|
||||
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing).toEqual(expect.arrayContaining([
|
||||
expect.objectContaining({ id: 'node-run-1', status: 'succeeded' }),
|
||||
expect.objectContaining({ id: 'node-run-2', status: 'succeeded' }),
|
||||
]))
|
||||
})
|
||||
|
||||
it('should handle early exits in tracing events during iteration or loop', async () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
@ -683,7 +623,7 @@ describe('useChat', () => {
|
||||
callbacks.onNodeFinished({ data: { id: 'n-1', iteration_id: 'iter-1' } })
|
||||
})
|
||||
|
||||
const traceLen1 = result.current.chatList.at(-1)!.workflowProcess?.tracing?.length
|
||||
const traceLen1 = result.current.chatList[result.current.chatList.length - 1].workflowProcess?.tracing?.length
|
||||
expect(traceLen1).toBe(0) // None added due to iteration early hits
|
||||
})
|
||||
|
||||
@ -767,7 +707,7 @@ describe('useChat', () => {
|
||||
|
||||
expect(result.current.chatList.some(item => item.id === 'question-m-child')).toBe(true)
|
||||
expect(result.current.chatList.some(item => item.id === 'm-child')).toBe(true)
|
||||
expect(result.current.chatList.at(-1)!.content).toBe('child answer')
|
||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('child answer')
|
||||
})
|
||||
|
||||
it('should strip local file urls before sending payload', () => {
|
||||
@ -865,7 +805,7 @@ describe('useChat', () => {
|
||||
})
|
||||
|
||||
expect(onGetConversationMessages).toHaveBeenCalled()
|
||||
expect(result.current.chatList.at(-1)!.content).toBe('streamed content')
|
||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('streamed content')
|
||||
})
|
||||
|
||||
it('should clear suggested questions when suggestion fetch fails after completion', async () => {
|
||||
@ -911,7 +851,7 @@ describe('useChat', () => {
|
||||
callbacks.onNodeFinished({ data: { node_id: 'n-loop', id: 'n-loop' } })
|
||||
})
|
||||
|
||||
const latestResponse = result.current.chatList.at(-1)!
|
||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
expect(latestResponse.workflowProcess?.tracing).toHaveLength(0)
|
||||
})
|
||||
|
||||
@ -938,7 +878,7 @@ describe('useChat', () => {
|
||||
callbacks.onTTSChunk('m-th-bind', '')
|
||||
})
|
||||
|
||||
const latestResponse = result.current.chatList.at(-1)!
|
||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
expect(latestResponse.id).toBe('m-th-bind')
|
||||
expect(latestResponse.conversationId).toBe('c-th-bind')
|
||||
expect(latestResponse.workflowProcess?.status).toBe('succeeded')
|
||||
@ -1031,7 +971,7 @@ describe('useChat', () => {
|
||||
callbacks.onCompleted()
|
||||
})
|
||||
|
||||
const lastResponse = result.current.chatList.at(-1)!
|
||||
const lastResponse = result.current.chatList[result.current.chatList.length - 1]
|
||||
expect(lastResponse.agent_thoughts![0].thought).toContain('resumed')
|
||||
|
||||
expect(lastResponse.workflowProcess?.tracing?.length).toBeGreaterThan(0)
|
||||
|
||||
@ -12,7 +12,6 @@ import type {
|
||||
IOnDataMoreInfo,
|
||||
IOtherOptions,
|
||||
} from '@/service/base'
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { uniqBy } from 'es-toolkit/compat'
|
||||
import { noop } from 'es-toolkit/function'
|
||||
import { produce, setAutoFreeze } from 'immer'
|
||||
@ -32,8 +31,6 @@ import {
|
||||
} from '@/app/components/base/file-uploader/utils'
|
||||
import { useToastContext } from '@/app/components/base/toast/context'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import useTimestamp from '@/hooks/use-timestamp'
|
||||
import { useParams, usePathname } from '@/next/navigation'
|
||||
import {
|
||||
@ -55,19 +52,6 @@ type SendCallback = {
|
||||
isPublicAPI?: boolean
|
||||
}
|
||||
|
||||
type ParallelTraceLike = Pick<NodeTracing, 'id' | 'node_id' | 'parallel_id' | 'execution_metadata'>
|
||||
|
||||
const findParallelTraceIndex = (
|
||||
tracing: ParallelTraceLike[],
|
||||
data: Partial<ParallelTraceLike>,
|
||||
) => {
|
||||
return findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId: data.execution_metadata?.parallel_id ?? data.parallel_id,
|
||||
})
|
||||
}
|
||||
|
||||
export const useChat = (
|
||||
config?: ChatConfig,
|
||||
formSettings?: {
|
||||
@ -435,7 +419,8 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
@ -447,34 +432,38 @@ export const useChat = (
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
// if the node is already started, update the node
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
@ -516,7 +505,8 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
@ -592,7 +582,7 @@ export const useChat = (
|
||||
{},
|
||||
otherOptions,
|
||||
)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer, params.loop_id])
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
|
||||
const updateCurrentQAOnTree = useCallback(({
|
||||
parentId,
|
||||
@ -982,13 +972,12 @@ export const useChat = (
|
||||
},
|
||||
onIterationFinish: ({ data: iterationFinishedData }) => {
|
||||
const tracing = responseItem.workflowProcess!.tracing!
|
||||
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
|
||||
updateCurrentQAOnTree({
|
||||
@ -999,19 +988,30 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
// `data` is the outer send payload for this request; loop child runs should not emit top-level node traces here.
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -1020,14 +1020,10 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
// Use the outer request payload here as well so loop child runs skip top-level finish handling entirely.
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.loop_id)
|
||||
if (data.loop_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
|
||||
@ -1073,13 +1069,12 @@ export const useChat = (
|
||||
},
|
||||
onLoopFinish: ({ data: loopFinishedData }) => {
|
||||
const tracing = responseItem.workflowProcess!.tracing!
|
||||
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
|
||||
updateCurrentQAOnTree({
|
||||
|
||||
@ -264,7 +264,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
fireEvent.change(input, { target: { value: longUrl } })
|
||||
await userEvent.type(input, longUrl)
|
||||
|
||||
expect(input).toHaveValue(longUrl)
|
||||
})
|
||||
@ -275,7 +275,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
fireEvent.change(input, { target: { value: unicodeUrl } })
|
||||
await userEvent.type(input, unicodeUrl)
|
||||
|
||||
expect(input).toHaveValue(unicodeUrl)
|
||||
})
|
||||
@ -285,7 +285,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
fireEvent.change(input, { target: { value: 'https://rapid.com' } })
|
||||
await userEvent.type(input, 'https://rapid.com', { delay: 1 })
|
||||
|
||||
expect(input).toHaveValue('https://rapid.com')
|
||||
})
|
||||
@ -297,7 +297,7 @@ describe('UrlInput', () => {
|
||||
|
||||
render(<UrlInput {...props} />)
|
||||
const input = screen.getByRole('textbox')
|
||||
fireEvent.change(input, { target: { value: 'https://enter.com' } })
|
||||
await userEvent.type(input, 'https://enter.com')
|
||||
|
||||
// Focus button and press enter
|
||||
const button = screen.getByRole('button', { name: /run/i })
|
||||
|
||||
@ -157,7 +157,7 @@ describe('useDatasetCardState', () => {
|
||||
expect(result.current.modalState.showRenameModal).toBe(false)
|
||||
})
|
||||
|
||||
it('should close confirm delete modal when closeConfirmDelete is called', async () => {
|
||||
it('should close confirm delete modal when closeConfirmDelete is called', () => {
|
||||
const dataset = createMockDataset()
|
||||
const { result } = renderHook(() =>
|
||||
useDatasetCardState({ dataset, onSuccess: vi.fn() }),
|
||||
@ -168,7 +168,7 @@ describe('useDatasetCardState', () => {
|
||||
result.current.detectIsUsedByApp()
|
||||
})
|
||||
|
||||
await waitFor(() => {
|
||||
waitFor(() => {
|
||||
expect(result.current.modalState.showConfirmDelete).toBe(true)
|
||||
})
|
||||
|
||||
|
||||
@ -101,7 +101,6 @@ const createHumanInput = (overrides: Partial<HumanInputFormData> = {}): HumanInp
|
||||
describe('workflow-stream-handlers helpers', () => {
|
||||
it('should update tracing, result text, and human input state', () => {
|
||||
const parallelTrace = createTrace({
|
||||
id: 'parallel-trace-1',
|
||||
node_id: 'parallel-node',
|
||||
execution_metadata: { parallel_id: 'parallel-1' },
|
||||
details: [[]],
|
||||
@ -110,13 +109,11 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
let workflowProcessData = appendParallelStart(undefined, parallelTrace)
|
||||
workflowProcessData = appendParallelNext(workflowProcessData, parallelTrace)
|
||||
workflowProcessData = finishParallelTrace(workflowProcessData, createTrace({
|
||||
id: 'parallel-trace-1',
|
||||
node_id: 'parallel-node',
|
||||
execution_metadata: { parallel_id: 'parallel-1' },
|
||||
error: 'failed',
|
||||
}))
|
||||
workflowProcessData = upsertWorkflowNode(workflowProcessData, createTrace({
|
||||
id: 'node-trace-1',
|
||||
node_id: 'node-1',
|
||||
execution_metadata: { parallel_id: 'parallel-2' },
|
||||
}))!
|
||||
@ -163,129 +160,6 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
expect(nextProcess.tracing[0]?.details).toEqual([[], []])
|
||||
})
|
||||
|
||||
it('should keep separate iteration and loop traces for repeated executions with different ids', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'iter-trace-1',
|
||||
node_id: 'iter-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'loop-trace-1',
|
||||
node_id: 'loop-1',
|
||||
details: [[]],
|
||||
}),
|
||||
createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
details: [[]],
|
||||
}),
|
||||
]
|
||||
|
||||
const iterNextProcess = appendParallelNext(process, createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
}))
|
||||
const iterFinishedProcess = finishParallelTrace(iterNextProcess, createTrace({
|
||||
id: 'iter-trace-2',
|
||||
node_id: 'iter-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
details: undefined,
|
||||
}))
|
||||
const loopNextProcess = appendParallelNext(iterFinishedProcess, createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
}))
|
||||
const loopFinishedProcess = finishParallelTrace(loopNextProcess, createTrace({
|
||||
id: 'loop-trace-2',
|
||||
node_id: 'loop-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
details: undefined,
|
||||
}))
|
||||
|
||||
expect(loopFinishedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||
id: 'iter-trace-1',
|
||||
details: [[]],
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'iter-trace-2',
|
||||
details: [[], []],
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[2]).toEqual(expect.objectContaining({
|
||||
id: 'loop-trace-1',
|
||||
details: [[]],
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
expect(loopFinishedProcess.tracing[3]).toEqual(expect.objectContaining({
|
||||
id: 'loop-trace-2',
|
||||
details: [[], []],
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should append a new top-level trace when the same node starts with a different execution id', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
]
|
||||
|
||||
const updatedProcess = upsertWorkflowNode(process, createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
}))!
|
||||
|
||||
expect(updatedProcess.tracing).toHaveLength(2)
|
||||
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should finish the matching top-level trace when the same node runs again with a new execution id', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}),
|
||||
]
|
||||
|
||||
const updatedProcess = finishWorkflowNode(process, createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))!
|
||||
|
||||
expect(updatedProcess.tracing).toHaveLength(2)
|
||||
expect(updatedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||
id: 'trace-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||
id: 'trace-2',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}))
|
||||
})
|
||||
|
||||
it('should leave tracing unchanged when a parallel next event has no matching trace', () => {
|
||||
const process = createWorkflowProcess()
|
||||
process.tracing = [
|
||||
@ -297,7 +171,6 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
]
|
||||
|
||||
const nextProcess = appendParallelNext(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing-node',
|
||||
execution_metadata: { parallel_id: 'parallel-2' },
|
||||
}))
|
||||
@ -355,7 +228,6 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
},
|
||||
}))
|
||||
const notFinished = finishParallelTrace(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing',
|
||||
execution_metadata: {
|
||||
parallel_id: 'parallel-missing',
|
||||
@ -371,7 +243,6 @@ describe('workflow-stream-handlers helpers', () => {
|
||||
loop_id: 'loop-1',
|
||||
}))
|
||||
const unmatchedFinish = finishWorkflowNode(process, createTrace({
|
||||
id: 'trace-missing',
|
||||
node_id: 'missing',
|
||||
execution_metadata: {
|
||||
parallel_id: 'missing',
|
||||
|
||||
@ -5,8 +5,6 @@ import type { HumanInputFormTimeoutData, NodeTracing, WorkflowFinishedResponse }
|
||||
import { produce } from 'immer'
|
||||
import { getFilesInLogs } from '@/app/components/base/file-uploader/utils'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||
import { findTracingIndexByExecutionOrUniqueNodeId } from '@/app/components/workflow/utils/tracing-execution'
|
||||
import { sseGet } from '@/service/base'
|
||||
|
||||
type Notify = (payload: { type: 'error' | 'warning', message: string }) => void
|
||||
@ -51,20 +49,6 @@ const matchParallelTrace = (trace: WorkflowProcess['tracing'][number], data: Nod
|
||||
|| trace.parallel_id === data.execution_metadata?.parallel_id)
|
||||
}
|
||||
|
||||
const findParallelTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||
const parallelId = data.execution_metadata?.parallel_id
|
||||
const matchedIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: data.id,
|
||||
nodeId: data.node_id,
|
||||
parallelId,
|
||||
})
|
||||
|
||||
if (matchedIndex > -1)
|
||||
return matchedIndex
|
||||
|
||||
return tracing.findIndex(trace => matchParallelTrace(trace, data))
|
||||
}
|
||||
|
||||
const ensureParallelTraceDetails = (details?: NodeTracing['details']) => {
|
||||
return details?.length ? details : [[]]
|
||||
}
|
||||
@ -84,8 +68,7 @@ const appendParallelStart = (current: WorkflowProcess | undefined, data: NodeTra
|
||||
const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
const trace = draft.tracing[traceIndex]
|
||||
const trace = draft.tracing.find(item => matchParallelTrace(item, data))
|
||||
if (!trace)
|
||||
return
|
||||
|
||||
@ -97,13 +80,10 @@ const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
const finishParallelTrace = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
const traceIndex = draft.tracing.findIndex(item => matchParallelTrace(item, data))
|
||||
if (traceIndex > -1) {
|
||||
const currentTrace = draft.tracing[traceIndex]
|
||||
draft.tracing[traceIndex] = {
|
||||
...currentTrace,
|
||||
...data,
|
||||
details: data.details ?? currentTrace.details,
|
||||
expand: !!data.error,
|
||||
}
|
||||
}
|
||||
@ -116,13 +96,17 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
draft.expand = true
|
||||
const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id)
|
||||
const nextTrace = {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
expand: true,
|
||||
}
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(draft.tracing, nextTrace)
|
||||
if (currentIndex > -1)
|
||||
draft.tracing[currentIndex] = nextTrace
|
||||
else
|
||||
draft.tracing.push(nextTrace)
|
||||
})
|
||||
}
|
||||
|
||||
@ -131,7 +115,7 @@ const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||
return current
|
||||
|
||||
return updateWorkflowProcess(current, (draft) => {
|
||||
const currentIndex = findParallelTraceIndex(draft.tracing, data)
|
||||
const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data))
|
||||
if (currentIndex > -1) {
|
||||
draft.tracing[currentIndex] = {
|
||||
...(draft.tracing[currentIndex].extras
|
||||
|
||||
@ -109,13 +109,13 @@ describe('useWorkflowAgentLog', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{ id: 'trace-1', node_id: 'n1', execution_metadata: {} }],
|
||||
tracing: [{ node_id: 'n1', execution_metadata: {} }],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||
data: { node_id: 'n1', message_id: 'm1' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
const trace = store.getState().workflowRunningData!.tracing![0]
|
||||
@ -128,7 +128,6 @@ describe('useWorkflowAgentLog', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{
|
||||
id: 'trace-1',
|
||||
node_id: 'n1',
|
||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'log1' }] },
|
||||
}],
|
||||
@ -137,7 +136,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm2' },
|
||||
data: { node_id: 'n1', message_id: 'm2' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(2)
|
||||
@ -148,7 +147,6 @@ describe('useWorkflowAgentLog', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{
|
||||
id: 'trace-1',
|
||||
node_id: 'n1',
|
||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'old' }] },
|
||||
}],
|
||||
@ -157,7 +155,7 @@ describe('useWorkflowAgentLog', () => {
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1', text: 'new' },
|
||||
data: { node_id: 'n1', message_id: 'm1', text: 'new' },
|
||||
} as unknown as AgentLogResponse)
|
||||
|
||||
const log = store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log!
|
||||
@ -169,39 +167,17 @@ describe('useWorkflowAgentLog', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [{ id: 'trace-1', node_id: 'n1' }],
|
||||
tracing: [{ node_id: 'n1' }],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||
data: { node_id: 'n1', message_id: 'm1' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('should attach the log to the matching execution id when a node runs multiple times', () => {
|
||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ id: 'trace-1', node_id: 'n1', execution_metadata: {} },
|
||||
{ id: 'trace-2', node_id: 'n1', execution_metadata: {} },
|
||||
],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
result.current.handleWorkflowAgentLog({
|
||||
data: { node_id: 'n1', node_execution_id: 'trace-2', message_id: 'm2' },
|
||||
} as AgentLogResponse)
|
||||
|
||||
const tracing = store.getState().workflowRunningData!.tracing!
|
||||
expect(tracing[0].execution_metadata!.agent_log).toBeUndefined()
|
||||
expect(tracing[1].execution_metadata!.agent_log).toHaveLength(1)
|
||||
expect(tracing[1].execution_metadata!.agent_log![0].message_id).toBe('m2')
|
||||
})
|
||||
})
|
||||
|
||||
describe('useWorkflowNodeHumanInputFormFilled', () => {
|
||||
|
||||
@ -109,7 +109,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { id: 'trace-n1', node_id: 'n1' } } as NodeStartedResponse,
|
||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@ -138,7 +138,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { id: 'trace-n2', node_id: 'n2' } } as NodeStartedResponse,
|
||||
{ data: { node_id: 'n2' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@ -157,8 +157,8 @@ describe('useWorkflowNodeStarted', () => {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
{ node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
],
|
||||
}),
|
||||
},
|
||||
@ -166,7 +166,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { id: 'trace-1', node_id: 'n1' } } as NodeStartedResponse,
|
||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
@ -175,32 +175,6 @@ describe('useWorkflowNodeStarted', () => {
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing[1].status).toBe(NodeRunningStatus.Running)
|
||||
})
|
||||
|
||||
it('should append a new tracing entry when the same node starts a new execution id', () => {
|
||||
const { result, store } = renderViewportHook(() => useWorkflowNodeStarted(), {
|
||||
initialStoreState: {
|
||||
workflowRunningData: baseRunningData({
|
||||
tracing: [
|
||||
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||
],
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
act(() => {
|
||||
result.current.handleWorkflowNodeStarted(
|
||||
{ data: { id: 'trace-2', node_id: 'n1' } } as NodeStartedResponse,
|
||||
containerParams,
|
||||
)
|
||||
})
|
||||
|
||||
const tracing = store.getState().workflowRunningData!.tracing!
|
||||
expect(tracing).toHaveLength(3)
|
||||
expect(tracing[2].id).toBe('trace-2')
|
||||
expect(tracing[2].node_id).toBe('n1')
|
||||
expect(tracing[2].status).toBe(NodeRunningStatus.Running)
|
||||
})
|
||||
})
|
||||
|
||||
describe('useWorkflowNodeIterationStarted', () => {
|
||||
|
||||
@ -14,7 +14,7 @@ export const useWorkflowAgentLog = () => {
|
||||
} = workflowStore.getState()
|
||||
|
||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||
const currentIndex = draft.tracing!.findIndex(item => item.id === data.node_execution_id)
|
||||
const currentIndex = draft.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentIndex > -1) {
|
||||
const current = draft.tracing![currentIndex]
|
||||
|
||||
|
||||
@ -33,8 +33,8 @@ export const useWorkflowNodeStarted = () => {
|
||||
transform,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.id === data.id)
|
||||
if (currentIndex !== undefined && currentIndex > -1) {
|
||||
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentIndex && currentIndex > -1) {
|
||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||
draft.tracing![currentIndex] = {
|
||||
...data,
|
||||
|
||||
@ -42,12 +42,6 @@ import {
|
||||
import { useHooksStore } from '../../hooks-store'
|
||||
import { useWorkflowStore } from '../../store'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '../../types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '../../utils/top-level-tracing'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from '../../utils/tracing-execution'
|
||||
|
||||
type GetAbortController = (abortController: AbortController) => void
|
||||
type SendCallback = {
|
||||
@ -474,7 +468,10 @@ export const useChat = (
|
||||
onIterationFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -498,7 +495,10 @@ export const useChat = (
|
||||
onLoopFinish: ({ data }) => {
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -508,15 +508,19 @@ export const useChat = (
|
||||
}
|
||||
},
|
||||
onNodeStarted: ({ data }) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess!.tracing!, {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
}, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing![currentIndex] = {
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
responseItem.workflowProcess!.tracing!.push({
|
||||
...data,
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -535,12 +539,12 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data }) => {
|
||||
if (params.loop_id)
|
||||
return
|
||||
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data)
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
questionItem,
|
||||
@ -550,10 +554,7 @@ export const useChat = (
|
||||
}
|
||||
},
|
||||
onAgentLog: ({ data }) => {
|
||||
const currentNodeIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess!.tracing!, {
|
||||
executionId: data.node_execution_id,
|
||||
nodeId: data.node_id,
|
||||
})
|
||||
const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
||||
if (currentNodeIndex > -1) {
|
||||
const current = responseItem.workflowProcess!.tracing![currentNodeIndex]
|
||||
|
||||
@ -768,8 +769,7 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...iterationStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@ -780,14 +780,12 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: iterationFinishedData.id,
|
||||
nodeId: iterationFinishedData.node_id,
|
||||
parallelId: iterationFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[iterationIndex], iterationFinishedData),
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
@ -800,12 +798,22 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
@ -816,17 +824,14 @@ export const useChat = (
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess.tracing, {
|
||||
executionId: nodeFinishedData.id,
|
||||
nodeId: nodeFinishedData.node_id,
|
||||
parallelId: nodeFinishedData.execution_metadata?.parallel_id,
|
||||
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
|
||||
})
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData) as any
|
||||
}
|
||||
if (currentIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
|
||||
})
|
||||
},
|
||||
onLoopStart: ({ data: loopStartedData }) => {
|
||||
@ -835,8 +840,7 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...loopStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@ -847,14 +851,12 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: loopFinishedData.id,
|
||||
nodeId: loopFinishedData.node_id,
|
||||
parallelId: loopFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[loopIndex], loopFinishedData),
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,174 +0,0 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { NodeRunningStatus } from '@/app/components/workflow/types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from './top-level-tracing'
|
||||
|
||||
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||
id: 'trace-1',
|
||||
index: 0,
|
||||
predecessor_node_id: '',
|
||||
node_id: 'node-1',
|
||||
node_type: 'llm' as NodeTracing['node_type'],
|
||||
title: 'Node 1',
|
||||
inputs: {},
|
||||
inputs_truncated: false,
|
||||
process_data: {},
|
||||
process_data_truncated: false,
|
||||
outputs: {},
|
||||
outputs_truncated: false,
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
elapsed_time: 0,
|
||||
metadata: {
|
||||
iterator_length: 0,
|
||||
iterator_index: 0,
|
||||
loop_length: 0,
|
||||
loop_index: 0,
|
||||
},
|
||||
created_at: 0,
|
||||
created_by: {
|
||||
id: 'user-1',
|
||||
name: 'User',
|
||||
email: 'user@example.com',
|
||||
},
|
||||
finished_at: 0,
|
||||
...overrides,
|
||||
})
|
||||
|
||||
describe('upsertTopLevelTracingNodeOnStart', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('should append a new top-level node when no matching trace exists', () => {
|
||||
const tracing: NodeTracing[] = []
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-2',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should update an existing top-level node when the execution id matches', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
}),
|
||||
]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should append a new top-level node when the same node starts with a new execution id', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([existingTrace, startedNode])
|
||||
})
|
||||
|
||||
it('should update an existing running top-level node when the same node restarts with a new execution id', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
}),
|
||||
]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode, {
|
||||
reuseRunningNodeId: true,
|
||||
})
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([startedNode])
|
||||
})
|
||||
|
||||
it('should keep separate running top-level traces by default when a new execution id appears', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'trace-1',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const startedNode = createTrace({
|
||||
id: 'trace-2',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||
|
||||
expect(updated).toBe(true)
|
||||
expect(tracing).toEqual([existingTrace, startedNode])
|
||||
})
|
||||
|
||||
it('should ignore nested iteration node starts even when the node id matches a top-level trace', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'top-level-trace',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const nestedIterationTrace = createTrace({
|
||||
id: 'iteration-trace',
|
||||
node_id: 'node-1',
|
||||
iteration_id: 'iteration-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedIterationTrace)
|
||||
|
||||
expect(updated).toBe(false)
|
||||
expect(tracing).toEqual([existingTrace])
|
||||
})
|
||||
|
||||
it('should ignore nested loop node starts even when the node id matches a top-level trace', () => {
|
||||
const existingTrace = createTrace({
|
||||
id: 'top-level-trace',
|
||||
node_id: 'node-1',
|
||||
status: NodeRunningStatus.Succeeded,
|
||||
})
|
||||
const tracing: NodeTracing[] = [existingTrace]
|
||||
const nestedLoopTrace = createTrace({
|
||||
id: 'loop-trace',
|
||||
node_id: 'node-1',
|
||||
loop_id: 'loop-1',
|
||||
status: NodeRunningStatus.Running,
|
||||
})
|
||||
|
||||
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedLoopTrace)
|
||||
|
||||
expect(updated).toBe(false)
|
||||
expect(tracing).toEqual([existingTrace])
|
||||
})
|
||||
})
|
||||
@ -1,34 +0,0 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
import { NodeRunningStatus } from '../types'
|
||||
|
||||
const isNestedTracingNode = (trace: Pick<NodeTracing, 'iteration_id' | 'loop_id'>) => {
|
||||
return Boolean(trace.iteration_id || trace.loop_id)
|
||||
}
|
||||
|
||||
export const upsertTopLevelTracingNodeOnStart = (
|
||||
tracing: NodeTracing[],
|
||||
startedNode: NodeTracing,
|
||||
options?: {
|
||||
reuseRunningNodeId?: boolean
|
||||
},
|
||||
) => {
|
||||
if (isNestedTracingNode(startedNode))
|
||||
return false
|
||||
|
||||
const currentIndex = tracing.findIndex((item) => {
|
||||
if (item.id === startedNode.id)
|
||||
return true
|
||||
|
||||
if (!options?.reuseRunningNodeId)
|
||||
return false
|
||||
|
||||
return item.node_id === startedNode.node_id && item.status === NodeRunningStatus.Running
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
// Started events are the authoritative snapshot for an execution; merging would retain stale client-side fields.
|
||||
tracing[currentIndex] = startedNode
|
||||
else
|
||||
tracing.push(startedNode)
|
||||
|
||||
return true
|
||||
}
|
||||
@ -1,136 +0,0 @@
|
||||
import type { AgentLogItem, NodeTracing } from '@/types/workflow'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from './tracing-execution'
|
||||
|
||||
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||
id: 'trace-1',
|
||||
index: 0,
|
||||
predecessor_node_id: '',
|
||||
node_id: 'node-1',
|
||||
node_type: 'llm' as NodeTracing['node_type'],
|
||||
title: 'Node 1',
|
||||
inputs: {},
|
||||
inputs_truncated: false,
|
||||
process_data: {},
|
||||
process_data_truncated: false,
|
||||
outputs: {},
|
||||
outputs_truncated: false,
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
elapsed_time: 0,
|
||||
metadata: {
|
||||
iterator_length: 0,
|
||||
iterator_index: 0,
|
||||
loop_length: 0,
|
||||
loop_index: 0,
|
||||
},
|
||||
created_at: 0,
|
||||
created_by: {
|
||||
id: 'user-1',
|
||||
name: 'User',
|
||||
email: 'user@example.com',
|
||||
},
|
||||
finished_at: 0,
|
||||
...overrides,
|
||||
})
|
||||
|
||||
describe('tracing-execution utils', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('should prefer the exact execution id when the same node ran multiple times', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'trace-2',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(1)
|
||||
})
|
||||
|
||||
it('should fall back to a unique node id when the execution id is missing', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(0)
|
||||
})
|
||||
|
||||
it('should not fall back to node id when multiple executions exist', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(-1)
|
||||
})
|
||||
|
||||
it('should merge into an existing resume trace instead of appending a duplicate', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1', title: 'old title' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ node_id: 'node-1', title: 'new title' }))
|
||||
|
||||
expect(tracing).toHaveLength(1)
|
||||
expect(tracing[0].id).toBe('trace-1')
|
||||
expect(tracing[0].title).toBe('new title')
|
||||
})
|
||||
|
||||
it('should append a new trace when a new execution id appears', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ id: 'trace-2', node_id: 'node-1', title: 'second run' }))
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing[1].id).toBe('trace-2')
|
||||
})
|
||||
|
||||
it('should preserve agent logs when merging finish metadata', () => {
|
||||
const agentLogItem: AgentLogItem = {
|
||||
node_execution_id: 'trace-1',
|
||||
message_id: 'm-1',
|
||||
node_id: 'node-1',
|
||||
label: 'tool',
|
||||
data: {},
|
||||
status: 'success',
|
||||
}
|
||||
|
||||
const currentNode = createTrace({
|
||||
execution_metadata: {
|
||||
total_tokens: 1,
|
||||
total_price: 0,
|
||||
currency: 'USD',
|
||||
agent_log: [agentLogItem],
|
||||
parallel_id: 'p-1',
|
||||
},
|
||||
})
|
||||
|
||||
const mergedNode = mergeTracingNodePreservingExecutionMetadata(currentNode, {
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
execution_metadata: {
|
||||
total_tokens: 2,
|
||||
total_price: 1,
|
||||
currency: 'USD',
|
||||
parallel_id: 'p-1',
|
||||
extra: 'value',
|
||||
} as NodeTracing['execution_metadata'],
|
||||
})
|
||||
|
||||
expect(mergedNode.execution_metadata?.agent_log).toEqual([agentLogItem])
|
||||
expect((mergedNode.execution_metadata as Record<string, unknown>).extra).toBe('value')
|
||||
})
|
||||
})
|
||||
@ -1,76 +0,0 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
|
||||
type TracingLookup = {
|
||||
executionId?: string
|
||||
nodeId?: string
|
||||
parallelId?: string
|
||||
allowNodeIdFallbackWhenExecutionIdMissing?: boolean
|
||||
}
|
||||
|
||||
const getParallelId = (trace: Partial<NodeTracing>) => {
|
||||
return trace.execution_metadata?.parallel_id || trace.parallel_id
|
||||
}
|
||||
|
||||
export const findTracingIndexByExecutionOrUniqueNodeId = (
|
||||
tracing: Partial<NodeTracing>[],
|
||||
{ executionId, nodeId, parallelId, allowNodeIdFallbackWhenExecutionIdMissing = true }: TracingLookup,
|
||||
) => {
|
||||
if (executionId) {
|
||||
const exactIndex = tracing.findIndex(item => item.id === executionId)
|
||||
if (exactIndex > -1)
|
||||
return exactIndex
|
||||
|
||||
if (!allowNodeIdFallbackWhenExecutionIdMissing)
|
||||
return -1
|
||||
}
|
||||
|
||||
if (!nodeId)
|
||||
return -1
|
||||
|
||||
const candidates = tracing
|
||||
.map((item, index) => ({ item, index }))
|
||||
.filter(({ item }) => item.node_id === nodeId)
|
||||
.filter(({ item }) => !parallelId || getParallelId(item) === parallelId)
|
||||
|
||||
return candidates.length === 1 ? candidates[0].index : -1
|
||||
}
|
||||
|
||||
export const upsertTracingNodeOnResumeStart = (
|
||||
tracing: NodeTracing[],
|
||||
startedNode: NodeTracing,
|
||||
) => {
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: startedNode.id,
|
||||
nodeId: startedNode.node_id,
|
||||
parallelId: getParallelId(startedNode),
|
||||
allowNodeIdFallbackWhenExecutionIdMissing: false,
|
||||
})
|
||||
|
||||
if (currentIndex > -1) {
|
||||
tracing[currentIndex] = {
|
||||
...tracing[currentIndex],
|
||||
...startedNode,
|
||||
}
|
||||
return currentIndex
|
||||
}
|
||||
|
||||
tracing.push(startedNode)
|
||||
return tracing.length - 1
|
||||
}
|
||||
|
||||
export const mergeTracingNodePreservingExecutionMetadata = (
|
||||
currentNode: NodeTracing,
|
||||
incomingNode: Partial<NodeTracing>,
|
||||
): NodeTracing => {
|
||||
return {
|
||||
...currentNode,
|
||||
...incomingNode,
|
||||
execution_metadata: incomingNode.execution_metadata
|
||||
? {
|
||||
...currentNode.execution_metadata,
|
||||
...incomingNode.execution_metadata,
|
||||
agent_log: incomingNode.execution_metadata.agent_log ?? currentNode.execution_metadata?.agent_log,
|
||||
}
|
||||
: currentNode.execution_metadata,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user