From 2369deb1aa5b3f301a36be2cbd76a9c3b6668d19 Mon Sep 17 00:00:00 2001 From: Ryo Date: Sat, 25 Oct 2025 13:49:11 +0800 Subject: [PATCH] fix: replace hardcoded DebugURLTpl in Workflow with a configurable value (#2388) --- .../api/handler/coze/workflow_service_test.go | 1171 ----------------- backend/application/workflow/chatflow.go | 5 +- backend/application/workflow/workflow.go | 16 +- backend/bizpkg/config/base/base.go | 2 +- backend/bizpkg/debugutil/workflow_debug.go | 58 + .../crossdomain/workflow/model/workflow.go | 2 - backend/domain/workflow/entity/vo/node.go | 9 +- 7 files changed, 76 insertions(+), 1187 deletions(-) create mode 100644 backend/bizpkg/debugutil/workflow_debug.go diff --git a/backend/api/handler/coze/workflow_service_test.go b/backend/api/handler/coze/workflow_service_test.go index d0df3be73..3f0016bdc 100644 --- a/backend/api/handler/coze/workflow_service_test.go +++ b/backend/api/handler/coze/workflow_service_test.go @@ -79,7 +79,6 @@ import ( crossuser "github.com/coze-dev/coze-studio/backend/crossdomain/user" agententity "github.com/coze-dev/coze-studio/backend/domain/conversation/agentrun/entity" conventity "github.com/coze-dev/coze-studio/backend/domain/conversation/conversation/entity" - msgentity "github.com/coze-dev/coze-studio/backend/domain/conversation/message/entity" entity4 "github.com/coze-dev/coze-studio/backend/domain/memory/database/entity" entity2 "github.com/coze-dev/coze-studio/backend/domain/openauth/openapiauth/entity" "github.com/coze-dev/coze-studio/backend/domain/plugin/dto" @@ -98,7 +97,6 @@ import ( "github.com/coze-dev/coze-studio/backend/infra/cache/impl/redis" "github.com/coze-dev/coze-studio/backend/infra/checkpoint" "github.com/coze-dev/coze-studio/backend/infra/coderunner" - "github.com/coze-dev/coze-studio/backend/infra/coderunner/impl/direct" mockCrossUser "github.com/coze-dev/coze-studio/backend/internal/mock/crossdomain/crossuser" mockPlugin "github.com/coze-dev/coze-studio/backend/internal/mock/domain/plugin" mockcode "github.com/coze-dev/coze-studio/backend/internal/mock/domain/workflow/crossdomain/code" @@ -106,7 +104,6 @@ import ( storageMock "github.com/coze-dev/coze-studio/backend/internal/mock/infra/storage" "github.com/coze-dev/coze-studio/backend/internal/testutil" "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" - "github.com/coze-dev/coze-studio/backend/pkg/errorx" "github.com/coze-dev/coze-studio/backend/pkg/lang/ptr" "github.com/coze-dev/coze-studio/backend/pkg/lang/slices" "github.com/coze-dev/coze-studio/backend/pkg/sonic" @@ -1173,62 +1170,6 @@ func TestNodeTemplateList(t *testing.T) { } -func TestTestRunAndGetProcess(t *testing.T) { - mockey.PatchConvey("test test_run and get_process", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - r.appVarS.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return("1.0", nil).AnyTimes() - - id := r.load("entry_exit.json") - input := map[string]string{ - "arr": "[\"arr1\", \"arr2\"]", - "obj": "{\"field1\": [\"1234\", \"5678\"]}", - "input": "3.5", - } - - mockey.PatchConvey("test run then immediately cancel", func() { - exeID := r.testRun(id, input) - - r.cancel(id, exeID) - - e := r.getProcess(id, exeID) - // maybe cancel or success, whichever comes first - assert.Contains(t, []workflow.WorkflowExeStatus{workflow.WorkflowExeStatus_Cancel, workflow.WorkflowExeStatus_Success}, e.status) - }) - - mockey.PatchConvey("test run success, then cancel", func() { - exeID := r.testRun(id, input) - exeResult := r.getProcess(id, exeID) - exeResult.nodeResultHasResponseExtra(entity.ExitNodeKey, "terminal_plan", int64(2)) - - // cancel after success, nothing happens - r.cancel(id, exeID) - - his := r.getOpenAPIProcess(id, exeID) - assert.Equal(t, exeID, fmt.Sprintf("%d", *his.Data[0].ExecuteID)) - assert.Equal(t, workflow.WorkflowRunMode_Async, *his.Data[0].RunMode) - - r.publish(id, "v0.0.1", true) - - mockey.PatchConvey("openapi async run", func() { - exeID := r.openapiAsyncRun(id, input) - e := r.getProcess(id, exeID) - assert.Equal(t, "1.0_[\"1234\",\"5678\"]", e.output) - }) - - mockey.PatchConvey("openapi sync run", func() { - output, exeID := r.openapiSyncRun(id, input) - assert.Equal(t, "1.0_[\"1234\",\"5678\"]", output["data"]) - his := r.getOpenAPIProcess(id, exeID) - assert.Equal(t, exeID, fmt.Sprintf("%d", *his.Data[0].ExecuteID)) - assert.Equal(t, workflow.WorkflowRunMode_Sync, *his.Data[0].RunMode) - }) - }) - - }) -} - func TestValidateTree(t *testing.T) { mockey.PatchConvey("test validate tree", t, func() { r := newWfTestRunner(t) @@ -1346,114 +1287,6 @@ func TestValidateTree(t *testing.T) { }) } -func TestTestResumeWithInputNode(t *testing.T) { - mockey.PatchConvey("test test_resume with input node", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - id := r.load("input_receiver.json") - - userInput := map[string]any{ - "input": "user input", - "obj": map[string]any{ - "field1": []any{"1", "2"}, - }, - } - userInputStr, err := sonic.MarshalString(userInput) - assert.NoError(t, err) - - mockey.PatchConvey("cancel after interrupt", func() { - exeID := r.testRun(id, map[string]string{ - "input": "unused initial input", - }) - - e := r.getProcess(id, exeID) - assert.NotNil(t, e.event) // interrupted - - r.cancel(id, exeID) - - e = r.getProcess(id, exeID) - assert.Equal(t, workflow.WorkflowExeStatus_Cancel, e.status) - }) - - mockey.PatchConvey("cancel immediately after resume", func() { - exeID := r.testRun(id, map[string]string{ - "input": "unused initial input", - }) - - e := r.getProcess(id, exeID) - assert.NotNil(t, e.event) // interrupted - - r.testResume(id, exeID, e.event.ID, userInputStr) - r.cancel(id, exeID) - - e = r.getProcess(id, exeID, withPreviousEventID(e.event.ID)) - // maybe cancel or success, whichever comes first - if e.status != workflow.WorkflowExeStatus_Success && - e.status != workflow.WorkflowExeStatus_Cancel { - t.Errorf("expected to be either success or cancel, got: %v", e.status) - } - }) - - mockey.PatchConvey("test run, then test resume", func() { - exeID := r.testRun(id, map[string]string{ - "input": "unused initial input", - }) - - e := r.getProcess(id, exeID) - assert.NotNil(t, e.event) // interrupted - - r.testResume(id, exeID, e.event.ID, userInputStr) - - e = r.getProcess(id, exeID, withPreviousEventID(e.event.ID)) - assert.Equal(t, workflow.WorkflowExeStatus_Success, e.status) - assert.Equal(t, map[string]any{ - "input": "user input", - "inputArr": nil, - "field1": `["1","2"]`, - }, mustUnmarshalToMap(t, e.output)) - }) - - mockey.PatchConvey("node debug the input node", func() { - exeID := r.nodeDebug(id, "154951") - - e := r.getProcess(id, exeID) - assert.NotNil(t, e.event) // interrupted - - r.testResume(id, exeID, e.event.ID, userInputStr) - - e2 := r.getProcess(id, exeID, withPreviousEventID(e.event.ID)) - e2.assertSuccess() - assert.Equal(t, map[string]any{ - "input": "user input", - "inputArr": nil, - "obj": map[string]any{ - "field1": `["1","2"]`, - }, - }, mustUnmarshalToMap(t, e2.output)) - - result := r.getNodeExeHistory(id, exeID, "154951", nil) - assert.Equal(t, mustUnmarshalToMap(t, e2.output), mustUnmarshalToMap(t, result.Output)) - assert.Equal(t, mustUnmarshalToMap(t, e2.output), mustUnmarshalToMap(t, result.Input)) - }) - - mockey.PatchConvey("sync run does not support interrupt", func() { - r.publish(id, "v1.0.0", true) - - syncRunReq := &workflow.OpenAPIRunFlowRequest{ - WorkflowID: id, - Parameters: ptr.Of(mustMarshalToString(t, map[string]string{ - "input": "unused initial input", - })), - IsAsync: ptr.Of(false), - } - - resp := post[workflow.OpenAPIRunFlowResponse](r, syncRunReq) - assert.Equal(t, int64(errno.ErrOpenAPIInterruptNotSupported), resp.Code) - }) - }) -} - func TestQueryTypes(t *testing.T) { mockey.PatchConvey("test workflow node types", t, func() { r := newWfTestRunner(t) @@ -1725,82 +1558,6 @@ func TestUpdateWorkflowMeta(t *testing.T) { }) } -func TestStartNodeDefaultValues(t *testing.T) { - mockey.PatchConvey("default values", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - t.Run("no input keys, all fields use default values", func(t *testing.T) { - idStr := r.load("start_node_default_values.json") - r.publish(idStr, "v0.0.1", true) - input := map[string]string{} - result, _ := r.openapiSyncRun(idStr, input) - assert.Equal(t, result, map[string]any{ - "ts": "2025-07-09 21:43:34", - "files": "http://imagex.fanlv.fun/tos-cn-i-1heqlfnr21/e81acc11277f421390770618e24e01ce.jpeg~tplv-1heqlfnr21-image.image", - "str": "str", - "object": map[string]any{ - "a": "1", - }, - "array": []any{"1", "2"}, - "inter": int64(100), - "number": 12.4, - "bool": false, - }) - - }) - t.Run("all fields use default values", func(t *testing.T) { - idStr := r.load("start_node_default_values.json") - r.publish(idStr, "v0.0.1", true) - input := map[string]string{ - "str": "", - "array": "[]", - "object": "{}", - } - - result, _ := r.openapiSyncRun(idStr, input) - assert.Equal(t, result, map[string]any{ - "ts": "2025-07-09 21:43:34", - "files": "http://imagex.fanlv.fun/tos-cn-i-1heqlfnr21/e81acc11277f421390770618e24e01ce.jpeg~tplv-1heqlfnr21-image.image", - "str": "str", - "object": map[string]any{ - "a": "1", - }, - "array": []any{"1", "2"}, - "inter": int64(100), - "number": 12.4, - "bool": false, - }) - - }) - t.Run("some use default values and some use user-entered values", func(t *testing.T) { - idStr := r.load("start_node_default_values.json") - r.publish(idStr, "v0.0.1", true) - input := map[string]string{ - "str": "value", - "array": `["a","b"]`, - "object": "{}", - "bool": "true", - } - - result, _ := r.openapiSyncRun(idStr, input) - assert.Equal(t, result, map[string]any{ - "ts": "2025-07-09 21:43:34", - "files": "http://imagex.fanlv.fun/tos-cn-i-1heqlfnr21/e81acc11277f421390770618e24e01ce.jpeg~tplv-1heqlfnr21-image.image", - "str": "value", - "object": map[string]any{ - "a": "1", - }, - "array": []any{"a", "b"}, - "inter": int64(100), - "number": 12.4, - "bool": true, - }) - - }) - - }) -} - func TestListWorkflowAsToolData(t *testing.T) { mockey.PatchConvey("publish list workflow & list workflow as tool data", t, func() { r := newWfTestRunner(t) @@ -1909,140 +1666,6 @@ func TestInputComplex(t *testing.T) { }) } -func TestStreamResume(t *testing.T) { - mockey.PatchConvey("test stream resume", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - defer r.runServer()() - - id := r.load("input_complex.json") - - type expectedE struct { - ID string - Event appworkflow.StreamRunEventType - Data *streamRunData - } - - expectedEvents := []expectedE{ - { - ID: "0", - Event: appworkflow.MessageEvent, - Data: &streamRunData{ - NodeID: ptr.Of("191011"), - NodeType: ptr.Of("Input"), - NodeTitle: ptr.Of("输入"), - NodeSeqID: ptr.Of("0"), - NodeIsFinish: ptr.Of(true), - Content: ptr.Of("{\"content\":\"[{\\\"type\\\":\\\"object\\\",\\\"name\\\":\\\"input\\\",\\\"schema\\\":[{\\\"type\\\":\\\"string\\\",\\\"name\\\":\\\"name\\\",\\\"required\\\":false},{\\\"type\\\":\\\"integer\\\",\\\"name\\\":\\\"age\\\",\\\"required\\\":false}],\\\"required\\\":false},{\\\"type\\\":\\\"list\\\",\\\"name\\\":\\\"input_list\\\",\\\"schema\\\":{\\\"type\\\":\\\"object\\\",\\\"schema\\\":[{\\\"type\\\":\\\"string\\\",\\\"name\\\":\\\"name\\\",\\\"required\\\":false},{\\\"type\\\":\\\"integer\\\",\\\"name\\\":\\\"age\\\",\\\"required\\\":false}]},\\\"required\\\":false}]\",\"content_type\":\"form_schema\"}"), - ContentType: ptr.Of("text"), - }, - }, - { - ID: "1", - Event: appworkflow.InterruptEvent, - Data: &streamRunData{ - DebugURL: ptr.Of(fmt.Sprintf("https://www.coze.cn/work_flow?execute_id={{exeID}}&space_id=123&workflow_id=%s&execute_mode=2", id)), - InterruptData: &interruptData{ - EventID: "%s/%s", - Type: 5, - Data: "{\"content\":\"[{\\\"type\\\":\\\"object\\\",\\\"name\\\":\\\"input\\\",\\\"schema\\\":[{\\\"type\\\":\\\"string\\\",\\\"name\\\":\\\"name\\\",\\\"required\\\":false},{\\\"type\\\":\\\"integer\\\",\\\"name\\\":\\\"age\\\",\\\"required\\\":false}],\\\"required\\\":false},{\\\"type\\\":\\\"list\\\",\\\"name\\\":\\\"input_list\\\",\\\"schema\\\":{\\\"type\\\":\\\"object\\\",\\\"schema\\\":[{\\\"type\\\":\\\"string\\\",\\\"name\\\":\\\"name\\\",\\\"required\\\":false},{\\\"type\\\":\\\"integer\\\",\\\"name\\\":\\\"age\\\",\\\"required\\\":false}]},\\\"required\\\":false}]\",\"content_type\":\"form_schema\"}", - }, - }, - }, - } - - var ( - resumeID string - index int - ) - - r.publish(id, "v0.0.1", true) - - sseReader := r.openapiStream(id, map[string]any{}) - err := sseReader.ForEach(t.Context(), func(e *sse.Event) error { - t.Logf("sse id: %s, type: %s, data: %s", e.ID, e.Type, string(e.Data)) - if e.Type == string(appworkflow.InterruptEvent) { - var event streamRunData - err := sonic.Unmarshal(e.Data, &event) - assert.NoError(t, err) - resumeID = event.InterruptData.EventID - } - - var streamE streamRunData - err := sonic.Unmarshal(e.Data, &streamE) - assert.NoError(t, err) - debugURL := streamE.DebugURL - if debugURL != nil { - exeID := strings.TrimPrefix(strings.Split(*debugURL, "&")[0], "https://www.coze.cn/work_flow?execute_id=") - expectedEvents[index].Data.DebugURL = ptr.Of(strings.ReplaceAll(*debugURL, "{{exeID}}", exeID)) - } - if streamE.InterruptData != nil { - expectedEvents[index].Data.InterruptData.EventID = streamE.InterruptData.EventID - } - assert.Equal(t, expectedEvents[index], expectedE{ - ID: e.ID, - Event: appworkflow.StreamRunEventType(e.Type), - Data: &streamE, - }) - index++ - return nil - }) - assert.NoError(t, err) - - expectedEvents = []expectedE{ - { - ID: "0", - Event: appworkflow.MessageEvent, - Data: &streamRunData{ - NodeID: ptr.Of("900001"), - NodeType: ptr.Of("End"), - NodeTitle: ptr.Of("结束"), - NodeSeqID: ptr.Of("0"), - NodeIsFinish: ptr.Of(true), - Content: ptr.Of("{\"output\":{\"age\":1,\"name\":\"eino\"},\"output_list\":[{\"age\":null,\"name\":\"user_1\"},{\"age\":2,\"name\":null}]}"), - ContentType: ptr.Of("text"), - }, - }, - { - ID: "1", - Event: appworkflow.DoneEvent, - Data: &streamRunData{ - DebugURL: ptr.Of(fmt.Sprintf("https://www.coze.cn/work_flow?execute_id={{exeID}}&space_id=123&workflow_id=%s&execute_mode=2", id)), - }, - }, - } - - index = 0 - - sseReader = r.openapiResume(id, resumeID, mustMarshalToString(t, map[string]any{ - "input": `{"name": "eino", "age": 1}`, - "input_list": `[{"name":"user_1"},{"age":2}]`, - })) - err = sseReader.ForEach(t.Context(), func(e *sse.Event) error { - t.Logf("sse id: %s, type: %s, data: %s", e.ID, e.Type, string(e.Data)) - var streamE streamRunData - err := sonic.Unmarshal(e.Data, &streamE) - assert.NoError(t, err) - debugURL := streamE.DebugURL - if debugURL != nil { - exeID := strings.TrimPrefix(strings.Split(*debugURL, "&")[0], "https://www.coze.cn/work_flow?execute_id=") - expectedEvents[index].Data.DebugURL = ptr.Of(strings.ReplaceAll(*debugURL, "{{exeID}}", exeID)) - } - if streamE.InterruptData != nil { - expectedEvents[index].Data.InterruptData.EventID = streamE.InterruptData.EventID - } - assert.Equal(t, expectedEvents[index], expectedE{ - ID: e.ID, - Event: appworkflow.StreamRunEventType(e.Type), - Data: &streamE, - }) - index++ - return nil - }) - assert.NoError(t, err) - }) -} - func TestGetLLMNodeFCSettingsDetailAndMerged(t *testing.T) { mockey.PatchConvey("fc setting detail", t, func() { operationString := `{ @@ -2580,57 +2203,6 @@ func TestReleaseApplicationWorkflows(t *testing.T) { }) } -func TestCodeExceptionBranch(t *testing.T) { - mockey.PatchConvey("test code exception branch", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - id := r.load("exception/code_exception_branch.json") - - mockey.PatchConvey("exception branch", func() { - coderunner.SetCodeRunner(direct.NewRunner()) - - exeID := r.testRun(id, map[string]string{"input": "hello"}) - e := r.getProcess(id, exeID) - e.assertSuccess() - assert.Equal(t, map[string]any{ - "output": false, - "output1": "code result: false", - }, mustUnmarshalToMap(t, e.output)) - }) - - mockey.PatchConvey("normal branch", func() { - mockCodeRunner := mockcode.NewMockRunner(r.ctrl) - mockey.Mock(coderunner.GetCodeRunner).Return(mockCodeRunner).Build() - mockCodeRunner.EXPECT().Run(gomock.Any(), gomock.Any()).Return(&coderunner.RunResponse{ - Result: map[string]any{ - "key0": "value0", - "key1": []string{"value1", "value2"}, - "key2": map[string]any{}, - }, - }, nil).AnyTimes() - - exeID := r.testRun(id, map[string]string{"input": "hello"}) - e := r.getProcess(id, exeID) - e.assertSuccess() - assert.Equal(t, map[string]any{ - "output": true, - "output1": "", - }, mustUnmarshalToMap(t, e.output)) - - mockey.PatchConvey("sync run", func() { - r.publish(id, "v0.0.1", false) - - result, _ := r.openapiSyncRun(id, map[string]string{"input": "hello"}) - assert.Equal(t, map[string]any{ - "output": true, - "output1": "", - }, result) - }) - }) - }) -} - func TestCopyWorkflowAppToLibrary(t *testing.T) { r := newWfTestRunner(t) appworkflow.SVC.IDGenerator = r.idGen @@ -3240,579 +2812,6 @@ func TestJsonSerializationDeserializationWithWarning(t *testing.T) { }) } -func TestSetAppVariablesForSubProcesses(t *testing.T) { - mockey.PatchConvey("app variables for sub_process", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - r.appVarS.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return("1.0", nil).AnyTimes() - idStr := r.load("app_variables_for_sub_process.json") - r.publish(idStr, "v0.0.1", true) - result, _ := r.openapiSyncRun(idStr, map[string]any{ - "input": "ax", - }) - - assert.Equal(t, result, map[string]any{ - "output": "ax", - }) - - }) -} - -func TestHttpImplicitDependencies(t *testing.T) { - mockey.PatchConvey("test http implicit dependencies", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - r.appVarS.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return("1.0", nil).AnyTimes() - - idStr := r.load("httprequester/http_implicit_dependencies.json") - - r.publish(idStr, "v0.0.1", true) - - runner := mockcode.NewMockRunner(r.ctrl) - runner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, request *coderunner.RunRequest) (*coderunner.RunResponse, error) { - in := request.Params["input"] - _ = in - result := make(map[string]any) - err := sonic.UnmarshalString(in.(string), &result) - if err != nil { - return nil, err - } - - return &coderunner.RunResponse{ - Result: result, - }, nil - }).AnyTimes() - - coderunner.SetCodeRunner(runner) - - mockey.PatchConvey("test http node implicit dependencies", func() { - input := map[string]string{ - "input": "a", - } - result, _ := r.openapiSyncRun(idStr, input) - - batchRets := result["batch"].([]any) - loopRets := result["loop"].([]any) - - for _, r := range batchRets { - assert.Contains(t, []any{ - "http://echo.apifox.com/anything?aa=1.0&cc=1", - "http://echo.apifox.com/anything?aa=1.0&cc=2", - }, r) - } - for _, r := range loopRets { - assert.Contains(t, []any{ - "http://echo.apifox.com/anything?a=1&m=123", - "http://echo.apifox.com/anything?a=2&m=123", - }, r) - } - - }) - - mockey.PatchConvey("node debug http node implicit dependencies", func() { - exeID := r.nodeDebug(idStr, "109387", - withNDInput(map[string]string{ - "__apiInfo_url_87fc7c69536cae843fa7f5113cf0067b": "m", - "__apiInfo_url_ac86361e3cd503952e71986dc091fa6f": "a", - "__body_bodyData_json_ac86361e3cd503952e71986dc091fa6f": "b", - "__body_bodyData_json_f77817a7cf8441279e1cfd8af4eeb1da": "1", - })) - - e := r.getProcess(idStr, exeID, withSpecificNodeID("109387")) - e.assertSuccess() - - ret := make(map[string]any) - err := sonic.UnmarshalString(e.output, &ret) - assert.Nil(t, err) - err = sonic.UnmarshalString(ret["body"].(string), &ret) - assert.Nil(t, err) - assert.Equal(t, ret["url"].(string), "http://echo.apifox.com/anything?a=a&m=m") - - }) - - }) -} - -func TestMessageNodes(t *testing.T) { - mockey.PatchConvey("create message in dynamic conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - idStr := r.load("message/create_message.json") - r.publish(idStr, "v0.0.1", true) - - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - assert.Equal(t, true, ret["output"]) - assert.Equal(t, strconv.FormatInt(mID, 10), ret["mID"]) - }) - - mockey.PatchConvey("create message in static conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: "123", - ConversationName: "name" + strconv.FormatInt(cID, 10), - SpaceID: "123", - } - post[workflow.CreateProjectConversationDefResponse](r, createReq) - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - idStr := r.load("message/create_message.json") - testInput := map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunProjectID(123)) - e := r.getProcess(idStr, exeID) - e.assertSuccess() - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err, "Failed to unmarshal output JSON") - - assert.Equal(t, true, result["output"]) - assert.Equal(t, strconv.FormatInt(mID, 10), result["mID"]) - }) - - mockey.PatchConvey("create assistant message", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - r.message.EXPECT().GetLatestRunIDs(gomock.Any(), gomock.Any()).Return([]int64{rID}, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - idStr := r.load("message/create_assistant_message.json") - r.publish(idStr, "v0.0.1", true) - - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - assert.Equal(t, true, ret["output"]) - assert.Equal(t, strconv.FormatInt(mID, 10), ret["mID"]) - r.message.EXPECT().GetLatestRunIDs(gomock.Any(), gomock.Any()).Return([]int64{}, nil).AnyTimes() - ret, _ = r.openapiSyncRun(idStr, map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - assert.Equal(t, true, ret["output"]) - assert.Equal(t, strconv.FormatInt(mID, 10), ret["mID"]) - }) - - mockey.PatchConvey("create message in Bot scene", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - idStr := r.load("message/create_message_in_agent.json") - r.publish(idStr, "v0.0.1", true) - - testInput := map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunBotID(123)) - e := r.getProcess(idStr, exeID) - assert.Equal(t, e.status, workflow.WorkflowExeStatus_Fail) - assert.Contains(t, e.reason, "Only default conversation allow in agent scenario") - }) - - mockey.PatchConvey("create message without binding app nor bot", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - idStr := r.load("message/create_message_in_agent.json") - - testInput := map[string]string{ - "CONVERSATION_NAME": "Default", - } - exeID := r.testRun(idStr, testInput) - e := r.getProcess(idStr, exeID) - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err, "Failed to unmarshal output JSON") - - assert.Equal(t, false, result["isSuccess"]) - }) - - mockey.PatchConvey("query message list in dynamic conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().MessageList(gomock.Any(), gomock.Any()).Return(&message0.MessageListResponse{ - Messages: []*message0.WfMessage{ - { - ID: mID, - Role: "user", - ContentType: "text", - Text: ptr.Of("hello"), - }, - }, - }, nil).AnyTimes() - - idStr := r.load("message/message_list.json") - r.publish(idStr, "v0.0.1", true) - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - - mIDStr := strconv.FormatInt(mID, 10) - expected := []any{ - map[string]any{ - "messageId": mIDStr, - "role": "user", - "contentType": "text", - "content": "hello", - }, - } - assert.Equal(t, expected, ret["output"]) - }) - - mockey.PatchConvey("query message list in static conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: "123", - ConversationName: "name" + strconv.FormatInt(cID, 10), - SpaceID: "123", - } - post[workflow.CreateProjectConversationDefResponse](r, createReq) - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().MessageList(gomock.Any(), gomock.Any()).Return(&message0.MessageListResponse{ - Messages: []*message0.WfMessage{ - { - ID: mID, - Role: "user", - ContentType: "text", - Text: ptr.Of("hello"), - }, - }, - }, nil).AnyTimes() - - idStr := r.load("message/message_list.json") - testInput := map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunProjectID(123)) - e := r.getProcess(idStr, exeID) - e.assertSuccess() - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err) - - mIDStr := strconv.FormatInt(mID, 10) - expected := []any{ - map[string]any{ - "messageId": mIDStr, - "role": "user", - "contentType": "text", - "content": "hello", - }, - } - assert.Equal(t, expected, result["output"]) - }) - - mockey.PatchConvey("edit message in dynamic conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().Edit(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - ConversationID: cID, - }, nil).AnyTimes() - r.message.EXPECT().GetMessageByID(gomock.Any(), gomock.Any()).Return(&msgentity.Message{ - ID: mID, - ConversationID: cID, - Content: "123", - }, nil).AnyTimes() - - idStr := r.load("message/edit_message.json") - r.publish(idStr, "v0.0.1", true) - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - - assert.Equal(t, true, ret["isSuccess"]) - }) - - mockey.PatchConvey("edit message in static conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: "123", - ConversationName: "name" + strconv.FormatInt(cID, 10), - SpaceID: "123", - } - post[workflow.CreateProjectConversationDefResponse](r, createReq) - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().Edit(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - ConversationID: cID, - }, nil).AnyTimes() - r.message.EXPECT().GetMessageByID(gomock.Any(), gomock.Any()).Return(&msgentity.Message{ - ID: mID, - ConversationID: cID, - Content: "123", - }, nil).AnyTimes() - - idStr := r.load("message/edit_message.json") - testInput := map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunProjectID(123)) - e := r.getProcess(idStr, exeID) - e.assertSuccess() - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err) - - assert.Equal(t, true, result["isSuccess"]) - }) - - mockey.PatchConvey("edit message no permission", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - err := errorx.New(errno.ErrMessageNodeOperationFail, errorx.KV("cause", "message not found")) - r.message.EXPECT().Edit(gomock.Any(), gomock.Any()).Return(&message.Message{}, err).AnyTimes() - r.message.EXPECT().GetMessageByID(gomock.Any(), gomock.Any()).Return(&msgentity.Message{ - ConversationID: cID, - Content: "123456", - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - - idStr := r.load("message/edit_message_no_permission.json") - r.publish(idStr, "v0.0.1", true) - - testInput := map[string]string{ - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunProjectID(123)) - e := r.getProcess(idStr, exeID) - assert.Equal(t, e.status, workflow.WorkflowExeStatus_Fail) - assert.Contains(t, e.reason, "Message node operation failure: message not found") - }) - - mockey.PatchConvey("delete message in dynamic conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().Delete(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - - idStr := r.load("message/delete_message.json") - r.publish(idStr, "v0.0.1", true) - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - - assert.Equal(t, true, ret["isSuccess"]) - }) - - mockey.PatchConvey("delete message in static conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: "123", - ConversationName: "name" + strconv.FormatInt(cID, 10), - SpaceID: "123", - } - post[workflow.CreateProjectConversationDefResponse](r, createReq) - mID := time.Now().Unix() - r.message.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&message.Message{ - ID: mID, - }, nil).AnyTimes() - rID := time.Now().UnixNano() - r.agentRun.EXPECT().Create(gomock.Any(), gomock.Any()).Return(&agententity.RunRecordMeta{ - ID: rID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - r.message.EXPECT().Delete(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - - idStr := r.load("message/delete_message.json") - testInput := map[string]string{ - "USER_INPUT": "hello", - "CONVERSATION_NAME": "name" + strconv.FormatInt(cID, 10), - } - exeID := r.testRun(idStr, testInput, withRunProjectID(123)) - e := r.getProcess(idStr, exeID) - e.assertSuccess() - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err) - - assert.Equal(t, true, result["isSuccess"]) - }) - - mockey.PatchConvey("create, edit, delete message in agent default conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - idStr := r.load("message/agent_message.json") - - testInput := map[string]string{ - "CONVERSATION_NAME": "Default", - } - exeID := r.testRun(idStr, testInput) - e := r.getProcess(idStr, exeID) - output := e.output - var result map[string]any - err := sonic.Unmarshal([]byte(output), &result) - assert.NoError(t, err, "Failed to unmarshal output JSON") - - assert.Equal(t, false, result["create_success"]) - assert.Equal(t, false, result["edit_success"]) - assert.Equal(t, false, result["delete_success"]) - }) -} - func TestChatFlowRoleAPI(t *testing.T) { mockey.PatchConvey("chat flow role api", t, func() { r := newWfTestRunner(t) @@ -4077,133 +3076,6 @@ func TestConversationOfChatFlow(t *testing.T) { } -func TestConversationNodes(t *testing.T) { - mockey.PatchConvey("create conversation", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - sID := time.Now().UnixNano() - r.conversation.EXPECT().GetByID(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - SectionID: sID, - }, nil).AnyTimes() - - idStr := r.load("conversation_manager/create_conversation.json") - r.publish(idStr, "v0.0.1", true) - - ret, _ := r.openapiSyncRun(idStr, map[string]string{ - "input": "name" + strconv.FormatInt(cID, 10), - }, withRunProjectID(123)) - assert.Equal(t, strconv.FormatInt(cID, 10), ret["output"]) - }) - - mockey.PatchConvey("update template conversation name", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - - // update template name - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: "123", - ConversationName: "template_v1", - SpaceID: "123", - } - post[workflow.CreateProjectConversationDefResponse](r, createReq) - idStr := r.load("conversation_manager/update_conversation.json") - - execID := r.testRun(idStr, map[string]string{}, withRunProjectID(123)) - - e := r.getProcess(idStr, execID) - - assert.Equal(t, workflow.WorkflowExeStatus_Fail, e.status) - assert.Contains(t, e.reason, "Only conversation created through nodes are allowed to be modified or deleted") - }) - - mockey.PatchConvey("update & delete dynamic conversation name", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - cID := time.Now().Unix() - appID := time.Now().Unix() - r.conversation.EXPECT().CreateConversation(gomock.Any(), gomock.Any()).Return(&conventity.Conversation{ - ID: cID, - }, nil).AnyTimes() - - // create conversation and update conversation name - idStr := r.load("conversation_manager/update_dynamic_conversation.json") - execID := r.testRun(idStr, map[string]string{ - "input": "v1", - "new_name": "v2", - }, withRunProjectID(appID)) - - e := r.getProcess(idStr, execID) - - output := map[string]any{} - err := sonic.UnmarshalString(e.output, &output) - assert.Nil(t, err) - assert.Equal(t, map[string]any{"conversationId": strconv.FormatInt(cID, 10), "isExisted": false, "isSuccess": true}, output["obj"]) - - execID = r.testRun(idStr, map[string]string{ - "input": "v1", - "new_name": "v2", - }, withRunProjectID(appID)) - e = r.getProcess(idStr, execID) - output = map[string]any{} - err = sonic.UnmarshalString(e.output, &output) - assert.Nil(t, err) - assert.Equal(t, map[string]any{"conversationId": strconv.FormatInt(cID, 10), "isExisted": true, "isSuccess": false}, output["obj"]) - - // create template conversation & delete conversation - createReq := &workflow.CreateProjectConversationDefRequest{ - ProjectID: strconv.FormatInt(appID, 10), - ConversationName: "template_v1", - SpaceID: "123", - } - _ = post[workflow.CreateProjectConversationDefResponse](r, createReq) - - deleteIDStr := r.load("conversation_manager/delete_conversation.json") - - execID = r.testRun(deleteIDStr, map[string]string{ - "input": "template_v1", - }, withRunProjectID(appID)) - - e = r.getProcess(idStr, execID) - assert.Equal(t, workflow.WorkflowExeStatus_Fail, e.status) - assert.Contains(t, e.reason, "Only conversation created through nodes are allowed to be modified or deleted") - - //delete dynamic conversation - execID = r.testRun(deleteIDStr, map[string]string{ - "input": "v1", - }, withRunProjectID(appID)) - - e = r.getProcess(idStr, execID) - assert.Equal(t, workflow.WorkflowExeStatus_Success, e.status) - - //delete dynamic conversation - execID = r.testRun(deleteIDStr, map[string]string{ - "input": "v2", - }, withRunProjectID(appID)) - - e = r.getProcess(idStr, execID) - assert.Equal(t, workflow.WorkflowExeStatus_Success, e.status) - - execID = r.testRun(idStr, map[string]string{ - "input": "v1", - "new_name": "v2", - }, withRunProjectID(appID)) - e = r.getProcess(idStr, execID) - output = map[string]any{} - err = sonic.UnmarshalString(e.output, &output) - assert.Nil(t, err) - assert.Equal(t, map[string]any{"conversationId": strconv.FormatInt(cID, 10), "isExisted": false, "isSuccess": true}, output["obj"]) - }) -} - func TestConversationListNodes(t *testing.T) { mockey.PatchConvey("list dynamic conversation", t, func() { r := newWfTestRunner(t) @@ -4428,46 +3300,3 @@ func TestConversationHistoryNodes(t *testing.T) { assert.Equal(t, []any{}, outputMap["history_list"]) }) } - -func TestWorkflowRunWithFiles(t *testing.T) { - mockey.PatchConvey("workflow run with files", t, func() { - r := newWfTestRunner(t) - defer r.closeFn() - - r.knowledge.EXPECT().Store(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, document *knowledge.CreateDocumentRequest) (*knowledge.CreateDocumentResponse, error) { - - assert.Equal(t, "北京旅游景点.txt", document.FileName) - return &knowledge.CreateDocumentResponse{ - DocumentID: 1, - FileURL: document.FileURL, - FileName: document.FileName, - }, nil - }).AnyTimes() - - runner := mockcode.NewMockRunner(r.ctrl) - runner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, request *coderunner.RunRequest) (*coderunner.RunResponse, error) { - - return &coderunner.RunResponse{ - Result: request.Params, - }, nil - }).AnyTimes() - - mockey.Mock(coderunner.GetCodeRunner).Return(runner).Build() - - idStr := r.load("workflow_wf_file_name.json") - r.publish(idStr, "v0.1.1", true) - m, execID := r.openapiSyncRun(idStr, map[string]string{ - "f": "http://coze.fanlv.fun:8889/opencoze/tos-cn-i-v4nquku3lp/27b01dd5-b0f5-4dbd-a075-a48c14162d23.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250910%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250910T074412Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=2f3051a0645c9ed260f7cb6c93954147ceb347a61366c9f70b98d43c299a7732&x-wf-file_name=%E5%8C%97%E4%BA%AC%E6%97%85%E6%B8%B8%E6%99%AF%E7%82%B9.txt", - "fs": "[\"http://coze.fanlv.fun:8889/opencoze/tos-cn-i-v4nquku3lp/85056c12-ea40-4588-a2a2-5eab56b94e4c.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250910%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250910T074404Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=59f3a7b6774a33de127e42878e4821635ce74e1fc29237ba03b13d67a068fedf&x-wf-file_name=%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_2025-07-02_154139_105.jpg\",\"http://coze.fanlv.fun:8889/opencoze/tos-cn-i-v4nquku3lp/5ec9856d-0db0-44a1-9b82-43628221a928.jpeg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250910%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250910T074410Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=3887b0583084b0294b91e93e307c61ce3b910531d0e33a08e7c7d57de24c71ec&x-wf-file_name=20250317-154742.jpeg\"]", - }) - assert.NotNil(t, execID) - - assert.Equal(t, m["output"], []any{ - "http://coze.fanlv.fun:8889/opencoze/tos-cn-i-v4nquku3lp/85056c12-ea40-4588-a2a2-5eab56b94e4c.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250910%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250910T074404Z&X-Amz-Expires=604800&X-Amz-Signature=59f3a7b6774a33de127e42878e4821635ce74e1fc29237ba03b13d67a068fedf&X-Amz-SignedHeaders=host", - "http://coze.fanlv.fun:8889/opencoze/tos-cn-i-v4nquku3lp/5ec9856d-0db0-44a1-9b82-43628221a928.jpeg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250910%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250910T074410Z&X-Amz-Expires=604800&X-Amz-Signature=3887b0583084b0294b91e93e307c61ce3b910531d0e33a08e7c7d57de24c71ec&X-Amz-SignedHeaders=host"}) - assert.Equal(t, m["filename"], "北京旅游景点.txt") - fmt.Println(m, execID) - - }) - -} diff --git a/backend/application/workflow/chatflow.go b/backend/application/workflow/chatflow.go index c08207be3..f80505cfb 100644 --- a/backend/application/workflow/chatflow.go +++ b/backend/application/workflow/chatflow.go @@ -31,6 +31,7 @@ import ( "github.com/coze-dev/coze-studio/backend/api/model/workflow" "github.com/coze-dev/coze-studio/backend/application/base/ctxutil" + "github.com/coze-dev/coze-studio/backend/bizpkg/debugutil" crossagentrun "github.com/coze-dev/coze-studio/backend/crossdomain/agentrun" crossconversation "github.com/coze-dev/coze-studio/backend/crossdomain/conversation" crossmessage "github.com/coze-dev/coze-studio/backend/crossdomain/message" @@ -831,7 +832,7 @@ func (w *ApplicationService) convertToChatFlowRunResponseList(ctx context.Contex } doneData, err := sonic.MarshalString(map[string]interface{}{ - "debug_url": fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID), + "debug_url": debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID), }) if err != nil { return nil, err @@ -976,7 +977,7 @@ func (w *ApplicationService) convertToChatFlowRunResponseList(ctx context.Contex }) doneData, _ := sonic.MarshalString(map[string]interface{}{ - "debug_url": fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID), + "debug_url": debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID), }) responses = append(responses, &workflow.ChatFlowRunResponse{ diff --git a/backend/application/workflow/workflow.go b/backend/application/workflow/workflow.go index ff2279ad0..9b737d2ec 100644 --- a/backend/application/workflow/workflow.go +++ b/backend/application/workflow/workflow.go @@ -42,6 +42,7 @@ import ( appmemory "github.com/coze-dev/coze-studio/backend/application/memory" appplugin "github.com/coze-dev/coze-studio/backend/application/plugin" "github.com/coze-dev/coze-studio/backend/application/user" + "github.com/coze-dev/coze-studio/backend/bizpkg/debugutil" crossknowledge "github.com/coze-dev/coze-studio/backend/crossdomain/knowledge" model "github.com/coze-dev/coze-studio/backend/crossdomain/knowledge/model" pluginConsts "github.com/coze-dev/coze-studio/backend/crossdomain/plugin/consts" @@ -1394,6 +1395,7 @@ func convertStreamRunEvent(workflowID int64) func(msg *entity.Message) (res *wor } }() + ctx := context.Background() if msg.StateMessage != nil { // stream run will skip all messages from workflow tools if executeID > 0 && executeID != msg.StateMessage.ExecuteID { @@ -1405,7 +1407,7 @@ func convertStreamRunEvent(workflowID int64) func(msg *entity.Message) (res *wor return &workflow.OpenAPIStreamRunFlowResponse{ ID: strconv.Itoa(messageID), Event: string(DoneEvent), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID)), }, nil case entity.WorkflowFailed, entity.WorkflowCancel: var wfe vo.WorkflowError @@ -1415,7 +1417,7 @@ func convertStreamRunEvent(workflowID int64) func(msg *entity.Message) (res *wor return &workflow.OpenAPIStreamRunFlowResponse{ ID: strconv.Itoa(messageID), Event: string(ErrEvent), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID)), ErrorCode: ptr.Of(int64(wfe.Code())), ErrorMessage: ptr.Of(wfe.Msg()), }, nil @@ -1424,7 +1426,7 @@ func convertStreamRunEvent(workflowID int64) func(msg *entity.Message) (res *wor return &workflow.OpenAPIStreamRunFlowResponse{ ID: strconv.Itoa(messageID), Event: string(InterruptEvent), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID)), InterruptData: &workflow.Interrupt{ EventID: fmt.Sprintf("%d/%d", executeID, msg.InterruptEvent.ID), Type: workflow.InterruptType(msg.InterruptEvent.EventType), @@ -1436,7 +1438,7 @@ func convertStreamRunEvent(workflowID int64) func(msg *entity.Message) (res *wor return &workflow.OpenAPIStreamRunFlowResponse{ ID: strconv.Itoa(messageID), Event: string(InterruptEvent), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, executeID, spaceID, workflowID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, workflowID, spaceID, executeID)), InterruptData: &workflow.Interrupt{ EventID: fmt.Sprintf("%d/%d", executeID, msg.InterruptEvent.ID), Type: workflow.InterruptType(msg.InterruptEvent.ToolInterruptEvent.EventType), @@ -1729,7 +1731,7 @@ func (w *ApplicationService) OpenAPIRun(ctx context.Context, req *workflow.OpenA return &workflow.OpenAPIRunFlowResponse{ ExecuteID: ptr.Of(strconv.FormatInt(exeID, 10)), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, exeID, meta.SpaceID, meta.ID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, meta.ID, meta.SpaceID, exeID)), }, nil } @@ -1765,7 +1767,7 @@ func (w *ApplicationService) OpenAPIRun(ctx context.Context, req *workflow.OpenA return &workflow.OpenAPIRunFlowResponse{ Data: data, ExecuteID: ptr.Of(strconv.FormatInt(wfExe.ID, 10)), - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, wfExe.ID, wfExe.SpaceID, meta.ID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, meta.ID, wfExe.SpaceID, wfExe.ID)), Token: ptr.Of(wfExe.TokenInfo.InputTokens + wfExe.TokenInfo.OutputTokens), Cost: ptr.Of("0.00000"), }, nil @@ -1826,7 +1828,7 @@ func (w *ApplicationService) OpenAPIGetWorkflowRunHistory(ctx context.Context, r LogID: ptr.Of(exe.LogID), CreateTime: ptr.Of(exe.CreatedAt.Unix()), UpdateTime: updateTime, - DebugUrl: ptr.Of(fmt.Sprintf(workflowModel.DebugURLTpl, exe.ID, exe.SpaceID, exe.WorkflowID)), + DebugUrl: ptr.Of(debugutil.GetWorkflowDebugURL(ctx, exe.WorkflowID, exe.SpaceID, exe.ID)), Input: exe.Input, Output: exe.Output, Token: ptr.Of(exe.TokenInfo.InputTokens + exe.TokenInfo.OutputTokens), diff --git a/backend/bizpkg/config/base/base.go b/backend/bizpkg/config/base/base.go index ff41c6136..62d572d2e 100644 --- a/backend/bizpkg/config/base/base.go +++ b/backend/bizpkg/config/base/base.go @@ -106,7 +106,7 @@ func (c *BaseConfig) GetServerHost(ctx context.Context) (string, error) { host := cfg.ServerHost if host == "" { - return "", errors.New("server host is empty") + return "http://127.0.0.1:8888", nil } if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "https://") { diff --git a/backend/bizpkg/debugutil/workflow_debug.go b/backend/bizpkg/debugutil/workflow_debug.go new file mode 100644 index 000000000..c897ec1aa --- /dev/null +++ b/backend/bizpkg/debugutil/workflow_debug.go @@ -0,0 +1,58 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package debugutil + +import ( + "context" + "fmt" + "net/url" + "strconv" + + "github.com/coze-dev/coze-studio/backend/bizpkg/config" + "github.com/coze-dev/coze-studio/backend/pkg/logs" +) + +func GetWorkflowDebugURL(ctx context.Context, workflowID, spaceID, executeID int64) string { + defaultURL := fmt.Sprintf("http://127.0.0.1:8888/work_flow?execute_id=%d&space_id=%d&workflow_id=%d&execute_mode=2", executeID, spaceID, workflowID) + + serverHost, err := config.Base().GetServerHost(ctx) + if err != nil { + logs.CtxErrorf(ctx, "[GetWorkflowDebugURL] get base config failed, use default debug url instead, err: %v", err) + return defaultURL + } + + workFlowURL, err := url.JoinPath(serverHost, "work_flow") + if err != nil { + logs.CtxErrorf(ctx, "[GetWorkflowDebugURL] join path failed, use default debug url instead, err: %v", err) + return defaultURL + } + + u, err := url.Parse(workFlowURL) + if err != nil { + logs.CtxErrorf(ctx, "[GetWorkflowDebugURL] parse workflow url failed, use default debug url instead, err: %v", err) + return defaultURL + } + + q := u.Query() + q.Set("execute_id", strconv.FormatInt(executeID, 10)) + q.Set("space_id", strconv.FormatInt(spaceID, 10)) + q.Set("workflow_id", strconv.FormatInt(workflowID, 10)) + q.Set("execute_mode", "2") + u.RawQuery = q.Encode() + + return u.String() +} diff --git a/backend/crossdomain/workflow/model/workflow.go b/backend/crossdomain/workflow/model/workflow.go index 9d63bb631..ff42ee04f 100644 --- a/backend/crossdomain/workflow/model/workflow.go +++ b/backend/crossdomain/workflow/model/workflow.go @@ -84,8 +84,6 @@ const ( SyncPatternStream SyncPattern = "stream" ) -var DebugURLTpl = "http://127.0.0.1:3000/work_flow?execute_id=%d&space_id=%d&workflow_id=%d&execute_mode=2" - type BizType string const ( diff --git a/backend/domain/workflow/entity/vo/node.go b/backend/domain/workflow/entity/vo/node.go index eb5e4dace..af21716d4 100644 --- a/backend/domain/workflow/entity/vo/node.go +++ b/backend/domain/workflow/entity/vo/node.go @@ -17,13 +17,14 @@ package vo import ( + "context" "errors" "fmt" "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" - workflowModel "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/model" + "github.com/coze-dev/coze-studio/backend/bizpkg/debugutil" "github.com/coze-dev/coze-studio/backend/pkg/errorx" "github.com/coze-dev/coze-studio/backend/pkg/sonic" "github.com/coze-dev/coze-studio/backend/types/errno" @@ -99,7 +100,7 @@ type wfErr struct { func (w *wfErr) DebugURL() string { if w.StatusError.Extra() == nil { - return fmt.Sprintf(workflowModel.DebugURLTpl, w.exeID, w.spaceID, w.workflowID) + return debugutil.GetWorkflowDebugURL(context.Background(), w.workflowID, w.spaceID, w.exeID) } debugURL, ok := w.StatusError.Extra()["debug_url"] @@ -107,7 +108,7 @@ func (w *wfErr) DebugURL() string { return debugURL } - return fmt.Sprintf(workflowModel.DebugURLTpl, w.exeID, w.spaceID, w.workflowID) + return debugutil.GetWorkflowDebugURL(context.Background(), w.workflowID, w.spaceID, w.exeID) } func (w *wfErr) Level() ErrorLevel { @@ -176,7 +177,7 @@ func WrapError(code int, err error, opts ...errorx.Option) WorkflowError { } func WrapWithDebug(code int, err error, exeID, spaceID, workflowID int64, opts ...errorx.Option) WorkflowError { - debugURL := fmt.Sprintf(workflowModel.DebugURLTpl, exeID, spaceID, workflowID) + debugURL := debugutil.GetWorkflowDebugURL(context.Background(), workflowID, spaceID, exeID) opts = append(opts, errorx.Extra("debug_url", debugURL)) return WrapError(code, err, opts...) }