Compare commits
1 Commits
main
...
feat/sync_
| Author | SHA1 | Date | |
|---|---|---|---|
| b0c6dd2046 |
@ -1459,6 +1459,37 @@ func TestTestResumeWithInputNode(t *testing.T) {
|
||||
resp := post[workflow.OpenAPIRunFlowResponse](r, syncRunReq)
|
||||
assert.Equal(t, int64(errno.ErrOpenAPIInterruptNotSupported), resp.Code)
|
||||
})
|
||||
|
||||
mockey.PatchConvey("test run, then sync resume", func() {
|
||||
ctx := t.Context()
|
||||
exeID := r.testRun(id, map[string]string{
|
||||
"input": "unused initial input",
|
||||
})
|
||||
e := r.getProcess(id, exeID)
|
||||
assert.NotNil(t, e.event) // interrupted
|
||||
|
||||
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
|
||||
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
|
||||
|
||||
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(ctx, &entity.ResumeRequest{
|
||||
ExecuteID: exeInt64ID,
|
||||
EventID: eventInt64ID,
|
||||
ResumeData: userInputStr,
|
||||
}, workflowModel.ExecuteConfig{
|
||||
Operator: 123,
|
||||
Mode: workflowModel.ExecuteModeDebug,
|
||||
BizType: workflowModel.BizTypeWorkflow,
|
||||
Cancellable: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, map[string]any{
|
||||
"input": "user input",
|
||||
"inputArr": nil,
|
||||
"field1": `["1","2"]`,
|
||||
}, mustUnmarshalToMap(t, *result.Output))
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@ -1823,64 +1854,173 @@ func TestInterruptWithinBatch(t *testing.T) {
|
||||
r := newWfTestRunner(t)
|
||||
defer r.closeFn()
|
||||
|
||||
id := r.load("batch/batch_with_inner_interrupt.json")
|
||||
exeID := r.testRun(id, map[string]string{
|
||||
"input_array": `["a","b"]`,
|
||||
"batch_concurrency": "2",
|
||||
mockey.PatchConvey("test run with async resume", func() {
|
||||
id := r.load("batch/batch_with_inner_interrupt.json")
|
||||
exeID := r.testRun(id, map[string]string{
|
||||
"input_array": `["a","b"]`,
|
||||
"batch_concurrency": "2",
|
||||
})
|
||||
|
||||
e := r.getProcess(id, exeID)
|
||||
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
|
||||
|
||||
exeIDInt, _ := strconv.ParseInt(exeID, 0, 64)
|
||||
storeIEs, _ := workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e.event.ID, map[string]any{
|
||||
"input": "input 1",
|
||||
})
|
||||
|
||||
e2 := r.getProcess(id, exeID, withPreviousEventID(e.event.ID))
|
||||
assert.Equal(t, workflow.EventType_InputNode, e2.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e2.event.ID, map[string]any{
|
||||
"input": "input 2",
|
||||
})
|
||||
|
||||
e3 := r.getProcess(id, exeID, withPreviousEventID(e2.event.ID))
|
||||
assert.Equal(t, workflow.EventType_Question, e3.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e3.event.ID, "answer 1")
|
||||
|
||||
e4 := r.getProcess(id, exeID, withPreviousEventID(e3.event.ID))
|
||||
assert.Equal(t, workflow.EventType_Question, e4.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 1, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e4.event.ID, "answer 2")
|
||||
|
||||
e5 := r.getProcess(id, exeID, withPreviousEventID(e4.event.ID))
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 0, len(storeIEs))
|
||||
e5.assertSuccess()
|
||||
|
||||
outputMap := mustUnmarshalToMap(t, e5.output)
|
||||
|
||||
if !reflect.DeepEqual(outputMap, map[string]any{
|
||||
"output": []any{"answer 1", "answer 2"},
|
||||
}) && !reflect.DeepEqual(outputMap, map[string]any{
|
||||
"output": []any{"answer 2", "answer 1"},
|
||||
}) {
|
||||
t.Errorf("output map not equal: %v", outputMap)
|
||||
}
|
||||
})
|
||||
mockey.PatchConvey("test run with sync resume", func() {
|
||||
id := r.load("batch/batch_with_inner_interrupt_for_debug_run.json")
|
||||
|
||||
exeID := r.testRun(id, map[string]string{
|
||||
"input_array": `["a","b"]`,
|
||||
"batch_concurrency": "2",
|
||||
})
|
||||
|
||||
e := r.getProcess(id, exeID)
|
||||
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
|
||||
|
||||
data := map[string]any{
|
||||
"input": "123",
|
||||
}
|
||||
bs, _ := sonic.Marshal(data)
|
||||
|
||||
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
|
||||
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
|
||||
|
||||
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
|
||||
ExecuteID: exeInt64ID,
|
||||
EventID: eventInt64ID,
|
||||
ResumeData: string(bs),
|
||||
}, workflowModel.ExecuteConfig{
|
||||
Operator: 123,
|
||||
Mode: workflowModel.ExecuteModeDebug,
|
||||
BizType: workflowModel.BizTypeWorkflow,
|
||||
Cancellable: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, result.Status, entity.WorkflowInterrupted)
|
||||
assert.NotNil(t, result.InterruptEvents)
|
||||
data = map[string]any{
|
||||
"input": "456",
|
||||
}
|
||||
bs, _ = sonic.Marshal(data)
|
||||
|
||||
result, _, err = appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
|
||||
ExecuteID: exeInt64ID,
|
||||
EventID: result.InterruptEvents[0].ID,
|
||||
ResumeData: string(bs),
|
||||
}, workflowModel.ExecuteConfig{
|
||||
Operator: 123,
|
||||
Mode: workflowModel.ExecuteModeDebug,
|
||||
BizType: workflowModel.BizTypeWorkflow,
|
||||
Cancellable: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, map[string]any{
|
||||
"output": []any{"123", "456"},
|
||||
}, mustUnmarshalToMap(t, *result.Output))
|
||||
|
||||
})
|
||||
mockey.PatchConvey("node debug run with sync resume", func() {
|
||||
id := r.load("batch/batch_with_inner_interrupt_for_debug_run.json")
|
||||
|
||||
exeID := r.nodeDebug(id, "105709", withNDBatch(map[string]string{}))
|
||||
|
||||
e := r.getProcess(id, exeID)
|
||||
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
|
||||
|
||||
data := map[string]any{
|
||||
"input": "123",
|
||||
}
|
||||
bs, _ := sonic.Marshal(data)
|
||||
|
||||
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
|
||||
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
|
||||
|
||||
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
|
||||
ExecuteID: exeInt64ID,
|
||||
EventID: eventInt64ID,
|
||||
ResumeData: string(bs),
|
||||
}, workflowModel.ExecuteConfig{
|
||||
Operator: 123,
|
||||
Mode: workflowModel.ExecuteModeNodeDebug,
|
||||
BizType: workflowModel.BizTypeWorkflow,
|
||||
Cancellable: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, result.Status, entity.WorkflowInterrupted)
|
||||
assert.NotNil(t, result.InterruptEvents)
|
||||
data = map[string]any{
|
||||
"input": "456",
|
||||
}
|
||||
bs, _ = sonic.Marshal(data)
|
||||
|
||||
result, _, err = appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
|
||||
ExecuteID: exeInt64ID,
|
||||
EventID: result.InterruptEvents[0].ID,
|
||||
ResumeData: string(bs),
|
||||
}, workflowModel.ExecuteConfig{
|
||||
Operator: 123,
|
||||
Mode: workflowModel.ExecuteModeNodeDebug,
|
||||
BizType: workflowModel.BizTypeWorkflow,
|
||||
Cancellable: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, map[string]any{
|
||||
"output": []any{"123", "456"},
|
||||
}, mustUnmarshalToMap(t, *result.Output))
|
||||
fmt.Println(result, err)
|
||||
|
||||
})
|
||||
|
||||
e := r.getProcess(id, exeID)
|
||||
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
|
||||
|
||||
exeIDInt, _ := strconv.ParseInt(exeID, 0, 64)
|
||||
storeIEs, _ := workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e.event.ID, map[string]any{
|
||||
"input": "input 1",
|
||||
})
|
||||
|
||||
e2 := r.getProcess(id, exeID, withPreviousEventID(e.event.ID))
|
||||
assert.Equal(t, workflow.EventType_InputNode, e2.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e2.event.ID, map[string]any{
|
||||
"input": "input 2",
|
||||
})
|
||||
|
||||
e3 := r.getProcess(id, exeID, withPreviousEventID(e2.event.ID))
|
||||
assert.Equal(t, workflow.EventType_Question, e3.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 2, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e3.event.ID, "answer 1")
|
||||
|
||||
e4 := r.getProcess(id, exeID, withPreviousEventID(e3.event.ID))
|
||||
assert.Equal(t, workflow.EventType_Question, e4.event.Type)
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 1, len(storeIEs))
|
||||
|
||||
r.testResume(id, exeID, e4.event.ID, "answer 2")
|
||||
|
||||
e5 := r.getProcess(id, exeID, withPreviousEventID(e4.event.ID))
|
||||
|
||||
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
|
||||
assert.Equal(t, 0, len(storeIEs))
|
||||
e5.assertSuccess()
|
||||
|
||||
outputMap := mustUnmarshalToMap(t, e5.output)
|
||||
|
||||
if !reflect.DeepEqual(outputMap, map[string]any{
|
||||
"output": []any{"answer 1", "answer 2"},
|
||||
}) && !reflect.DeepEqual(outputMap, map[string]any{
|
||||
"output": []any{"answer 2", "answer 1"},
|
||||
}) {
|
||||
t.Errorf("output map not equal: %v", outputMap)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -6292,3 +6432,5 @@ func TestChatFlowRun(t *testing.T) {
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@ -35,6 +35,7 @@ type Executable interface {
|
||||
AsyncExecute(ctx context.Context, config workflowModel.ExecuteConfig, input map[string]any) (int64, error)
|
||||
AsyncExecuteNode(ctx context.Context, nodeID string, config workflowModel.ExecuteConfig, input map[string]any) (int64, error)
|
||||
AsyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) error
|
||||
SyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error)
|
||||
StreamExecute(ctx context.Context, config workflowModel.ExecuteConfig, input map[string]any) (*schema.StreamReader[*entity.Message], error)
|
||||
StreamResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (
|
||||
*schema.StreamReader[*entity.Message], error)
|
||||
|
||||
@ -0,0 +1,209 @@
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": "100001",
|
||||
"type": "1",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 180,
|
||||
"y": 13.700000000000003
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"nodeMeta": {
|
||||
"description": "工作流的起始节点,用于设定启动工作流需要的信息",
|
||||
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-start.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=cc901caf2ac7105e181a549398596a3d77a45b1735d75082da7924bcd6ee4a56",
|
||||
"subTitle": "",
|
||||
"title": "开始"
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"type": "string",
|
||||
"name": "input",
|
||||
"required": false
|
||||
}
|
||||
],
|
||||
"trigger_parameters": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "900001",
|
||||
"type": "2",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 1300,
|
||||
"y": 0.7000000000000028
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"nodeMeta": {
|
||||
"description": "工作流的最终节点,用于返回工作流运行后的结果信息",
|
||||
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-end.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=61259d932108c153d5470e7fe21984f0961007508b01b1e884250744ede25dde",
|
||||
"subTitle": "",
|
||||
"title": "结束"
|
||||
},
|
||||
"inputs": {
|
||||
"terminatePlan": "returnVariables",
|
||||
"inputParameters": [
|
||||
{
|
||||
"name": "output",
|
||||
"input": {
|
||||
"type": "list",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
},
|
||||
"value": {
|
||||
"type": "ref",
|
||||
"content": {
|
||||
"source": "block-output",
|
||||
"blockID": "105709",
|
||||
"name": "output"
|
||||
},
|
||||
"rawMeta": {
|
||||
"type": 99
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "105709",
|
||||
"type": "28",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 740,
|
||||
"y": 0
|
||||
},
|
||||
"canvasPosition": {
|
||||
"x": 560,
|
||||
"y": 293.4
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"nodeMeta": {
|
||||
"title": "批处理",
|
||||
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-batch.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=ea39a384a6d4ddb2dfe873ab7277b996747e42e129da23367c62068bf8980068",
|
||||
"description": "通过设定批量运行次数和逻辑,运行批处理体内的任务",
|
||||
"mainColor": "#00B2B2",
|
||||
"subTitle": "批处理"
|
||||
},
|
||||
"inputs": {
|
||||
"concurrentSize": {
|
||||
"type": "integer",
|
||||
"value": {
|
||||
"type": "literal",
|
||||
"content": "1"
|
||||
}
|
||||
},
|
||||
"batchSize": {
|
||||
"type": "integer",
|
||||
"value": {
|
||||
"type": "literal",
|
||||
"content": "100"
|
||||
}
|
||||
},
|
||||
"inputParameters": [
|
||||
{
|
||||
"name": "input",
|
||||
"input": {
|
||||
"type": "list",
|
||||
"value": {
|
||||
"type": "literal",
|
||||
"content": "[\"12\",\"2\"]",
|
||||
"rawMeta": {
|
||||
"type": 99
|
||||
}
|
||||
},
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "output",
|
||||
"input": {
|
||||
"type": "list",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
},
|
||||
"value": {
|
||||
"type": "ref",
|
||||
"content": {
|
||||
"source": "block-output",
|
||||
"blockID": "136577",
|
||||
"name": "input"
|
||||
},
|
||||
"rawMeta": {
|
||||
"type": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"blocks": [
|
||||
{
|
||||
"id": "136577",
|
||||
"type": "30",
|
||||
"meta": {
|
||||
"position": {
|
||||
"x": 180,
|
||||
"y": 0
|
||||
}
|
||||
},
|
||||
"data": {
|
||||
"outputs": [
|
||||
{
|
||||
"type": "string",
|
||||
"name": "input",
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"nodeMeta": {
|
||||
"title": "输入",
|
||||
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-input.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=d0af4e55a091037e82c025f29e925fbdb0592ca1319d03948a1833b5559ab2c4",
|
||||
"description": "支持中间过程的信息输入",
|
||||
"mainColor": "#5C62FF",
|
||||
"subTitle": "输入"
|
||||
},
|
||||
"inputs": {
|
||||
"outputSchema": "[{\"type\":\"string\",\"name\":\"input\",\"required\":true}]"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"sourceNodeID": "105709",
|
||||
"targetNodeID": "136577",
|
||||
"sourcePortID": "batch-function-inline-output"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "136577",
|
||||
"targetNodeID": "105709",
|
||||
"targetPortID": "batch-function-inline-input"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"edges": [
|
||||
{
|
||||
"sourceNodeID": "100001",
|
||||
"targetNodeID": "105709"
|
||||
},
|
||||
{
|
||||
"sourceNodeID": "105709",
|
||||
"targetNodeID": "900001",
|
||||
"sourcePortID": "batch-output"
|
||||
}
|
||||
],
|
||||
"versions": {
|
||||
"loop": "v2"
|
||||
}
|
||||
}
|
||||
@ -315,44 +315,42 @@ func handleEvent(ctx context.Context, event *Event, repo workflow.Repository,
|
||||
}
|
||||
|
||||
// TODO: there maybe time gap here
|
||||
|
||||
if err := repo.SaveInterruptEvents(ctx, event.RootExecuteID, event.InterruptEvents); err != nil {
|
||||
return noTerminate, fmt.Errorf("failed to save interrupt events: %v", err)
|
||||
}
|
||||
|
||||
if sw != nil && event.SubWorkflowCtx == nil { // only send interrupt event when is root workflow
|
||||
if event.SubWorkflowCtx == nil {
|
||||
firstIE, found, err := repo.GetFirstInterruptEvent(ctx, event.RootExecuteID)
|
||||
if err != nil {
|
||||
return noTerminate, fmt.Errorf("failed to get first interrupt event: %v", err)
|
||||
}
|
||||
|
||||
if !found {
|
||||
return noTerminate, fmt.Errorf("interrupt event does not exist, wfExeID: %d", event.RootExecuteID)
|
||||
}
|
||||
|
||||
nodeKey := firstIE.NodeKey
|
||||
|
||||
sw.Send(&entity.Message{
|
||||
DataMessage: &entity.DataMessage{
|
||||
ExecuteID: event.RootExecuteID,
|
||||
Role: schema.Assistant,
|
||||
Type: entity.Answer,
|
||||
Content: firstIE.InterruptData, // TODO: may need to extract from InterruptData the actual info for user
|
||||
NodeID: string(nodeKey),
|
||||
NodeType: firstIE.NodeType,
|
||||
NodeTitle: firstIE.NodeTitle,
|
||||
Last: true,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
sw.Send(&entity.Message{
|
||||
StateMessage: &entity.StateMessage{
|
||||
ExecuteID: event.RootExecuteID,
|
||||
EventID: event.GetResumedEventID(),
|
||||
Status: entity.WorkflowInterrupted,
|
||||
InterruptEvent: firstIE,
|
||||
},
|
||||
}, nil)
|
||||
event.InterruptEvents = []*entity.InterruptEvent{firstIE}
|
||||
if sw != nil { // only send interrupt event when is root workflow
|
||||
nodeKey := firstIE.NodeKey
|
||||
sw.Send(&entity.Message{
|
||||
DataMessage: &entity.DataMessage{
|
||||
ExecuteID: event.RootExecuteID,
|
||||
Role: schema.Assistant,
|
||||
Type: entity.Answer,
|
||||
Content: firstIE.InterruptData, // TODO: may need to extract from InterruptData the actual info for user
|
||||
NodeID: string(nodeKey),
|
||||
NodeType: firstIE.NodeType,
|
||||
NodeTitle: firstIE.NodeTitle,
|
||||
Last: true,
|
||||
},
|
||||
}, nil)
|
||||
sw.Send(&entity.Message{
|
||||
StateMessage: &entity.StateMessage{
|
||||
ExecuteID: event.RootExecuteID,
|
||||
EventID: event.GetResumedEventID(),
|
||||
Status: entity.WorkflowInterrupted,
|
||||
InterruptEvent: firstIE,
|
||||
},
|
||||
}, nil)
|
||||
}
|
||||
}
|
||||
|
||||
return workflowAbort, nil
|
||||
|
||||
@ -37,6 +37,7 @@ import (
|
||||
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/compose"
|
||||
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
|
||||
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
|
||||
wfschema "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
|
||||
@ -906,6 +907,205 @@ func (i *impl) AsyncResume(ctx context.Context, req *entity.ResumeRequest, confi
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *impl) SyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error) {
|
||||
var err error
|
||||
wfExe, found, err := i.repo.GetWorkflowExecution(ctx, req.ExecuteID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if !found {
|
||||
return nil, "", fmt.Errorf("workflow execution does not exist, id: %d", req.ExecuteID)
|
||||
}
|
||||
|
||||
if wfExe.RootExecutionID != wfExe.ID {
|
||||
return nil, "", fmt.Errorf("only root workflow can be resumed")
|
||||
}
|
||||
|
||||
if wfExe.Status != entity.WorkflowInterrupted {
|
||||
return nil, "", fmt.Errorf("workflow execution %d is not interrupted, status is %v, cannot resume", req.ExecuteID, wfExe.Status)
|
||||
}
|
||||
|
||||
var from workflowModel.Locator
|
||||
if wfExe.Version == "" {
|
||||
from = workflowModel.FromDraft
|
||||
} else {
|
||||
from = workflowModel.FromSpecificVersion
|
||||
}
|
||||
|
||||
wfEntity, err := i.Get(ctx, &vo.GetPolicy{
|
||||
ID: wfExe.WorkflowID,
|
||||
QType: from,
|
||||
Version: wfExe.Version,
|
||||
CommitID: wfExe.CommitID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
var canvas vo.Canvas
|
||||
err = sonic.UnmarshalString(wfEntity.Canvas, &canvas)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
config.From = from
|
||||
config.Version = wfExe.Version
|
||||
config.AppID = wfExe.AppID
|
||||
config.AgentID = wfExe.AgentID
|
||||
config.CommitID = wfExe.CommitID
|
||||
config.WorkflowMode = wfEntity.Mode
|
||||
|
||||
if config.ConnectorID == 0 {
|
||||
config.ConnectorID = wfExe.ConnectorID
|
||||
}
|
||||
|
||||
var (
|
||||
lastEventChan <-chan *execute.Event
|
||||
startTime time.Time
|
||||
out map[string]any
|
||||
wf *compose.Workflow
|
||||
cancelCtx context.Context
|
||||
opts []einoCompose.Option
|
||||
nodeCount int32
|
||||
workflowSC *wfschema.WorkflowSchema
|
||||
)
|
||||
if wfExe.Mode == workflowModel.ExecuteModeNodeDebug {
|
||||
var nodeExes []*entity.NodeExecution
|
||||
nodeExes, err = i.repo.GetNodeExecutionsByWfExeID(ctx, wfExe.ID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if len(nodeExes) == 0 {
|
||||
return nil, "", fmt.Errorf("during node debug resume, no node execution found for workflow execution %d", wfExe.ID)
|
||||
}
|
||||
|
||||
var nodeID string
|
||||
for _, ne := range nodeExes {
|
||||
if ne.ParentNodeID == nil {
|
||||
nodeID = ne.NodeID
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
workflowSC, err = adaptor.WorkflowSchemaFromNode(ctx, &canvas, nodeID)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to convert canvas to workflow schema: %w", err)
|
||||
}
|
||||
nodeCount = workflowSC.NodeCount()
|
||||
wf, err = compose.NewWorkflowFromNode(ctx, workflowSC, vo.NodeKey(nodeID),
|
||||
einoCompose.WithGraphName(fmt.Sprintf("%d", wfExe.WorkflowID)))
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to create workflow: %w", err)
|
||||
}
|
||||
|
||||
config.Mode = workflowModel.ExecuteModeNodeDebug
|
||||
|
||||
cancelCtx, _, opts, lastEventChan, err = compose.NewWorkflowRunner(
|
||||
wfEntity.GetBasic(), workflowSC, config, compose.WithResumeReq(req)).Prepare(ctx)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
startTime = time.Now()
|
||||
out, err = wf.SyncRun(cancelCtx, nil, opts...)
|
||||
|
||||
} else {
|
||||
workflowSC, err = adaptor.CanvasToWorkflowSchema(ctx, &canvas)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to convert canvas to workflow schema: %w", err)
|
||||
}
|
||||
|
||||
nodeCount = workflowSC.NodeCount()
|
||||
var wfOpts []compose.WorkflowOption
|
||||
wfOpts = append(wfOpts, compose.WithIDAsName(wfExe.WorkflowID))
|
||||
if s := execute.GetStaticConfig(); s != nil && s.MaxNodeCountPerWorkflow > 0 {
|
||||
wfOpts = append(wfOpts, compose.WithMaxNodeCount(s.MaxNodeCountPerWorkflow))
|
||||
}
|
||||
|
||||
wf, err = compose.NewWorkflow(ctx, workflowSC, wfOpts...)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to create workflow: %w", err)
|
||||
}
|
||||
|
||||
cancelCtx, _, opts, lastEventChan, err = compose.NewWorkflowRunner(
|
||||
wfEntity.GetBasic(), workflowSC, config, compose.WithResumeReq(req)).Prepare(ctx)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
|
||||
out, err = wf.SyncRun(cancelCtx, nil, opts...)
|
||||
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if _, ok := einoCompose.ExtractInterruptInfo(err); !ok {
|
||||
var wfe vo.WorkflowError
|
||||
if errors.As(err, &wfe) {
|
||||
return nil, "", wfe.AppendDebug(req.ExecuteID, wfEntity.SpaceID, wfEntity.ID)
|
||||
} else {
|
||||
return nil, "", vo.WrapWithDebug(errno.ErrWorkflowExecuteFail, err, req.ExecuteID, wfEntity.SpaceID, wfEntity.ID, errorx.KV("cause", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lastEvent := <-lastEventChan
|
||||
updateTime := time.Now()
|
||||
|
||||
var outStr string
|
||||
if wf.TerminatePlan() == vo.ReturnVariables {
|
||||
outStr, err = sonic.MarshalString(out)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
} else {
|
||||
outStr = out["output"].(string)
|
||||
}
|
||||
|
||||
var status entity.WorkflowExecuteStatus
|
||||
switch lastEvent.Type {
|
||||
case execute.WorkflowSuccess:
|
||||
status = entity.WorkflowSuccess
|
||||
case execute.WorkflowInterrupt:
|
||||
status = entity.WorkflowInterrupted
|
||||
case execute.WorkflowFailed:
|
||||
status = entity.WorkflowFailed
|
||||
case execute.WorkflowCancel:
|
||||
status = entity.WorkflowCancel
|
||||
}
|
||||
|
||||
var failReason *string
|
||||
if lastEvent.Err != nil {
|
||||
failReason = ptr.Of(lastEvent.Err.Error())
|
||||
}
|
||||
|
||||
return &entity.WorkflowExecution{
|
||||
ID: req.ExecuteID,
|
||||
WorkflowID: wfEntity.ID,
|
||||
Version: wfEntity.GetVersion(),
|
||||
SpaceID: wfEntity.SpaceID,
|
||||
ExecuteConfig: config,
|
||||
CreatedAt: startTime,
|
||||
NodeCount: nodeCount,
|
||||
Status: status,
|
||||
Duration: lastEvent.Duration,
|
||||
Input: ptr.Of(req.ResumeData),
|
||||
Output: ptr.Of(outStr),
|
||||
ErrorCode: ptr.Of("-1"),
|
||||
FailReason: failReason,
|
||||
TokenInfo: &entity.TokenUsage{
|
||||
InputTokens: lastEvent.GetInputTokens(),
|
||||
OutputTokens: lastEvent.GetOutputTokens(),
|
||||
},
|
||||
UpdatedAt: ptr.Of(updateTime),
|
||||
RootExecutionID: req.ExecuteID,
|
||||
InterruptEvents: lastEvent.InterruptEvents,
|
||||
}, wf.TerminatePlan(), nil
|
||||
|
||||
}
|
||||
|
||||
// StreamResume resumes a workflow execution, using the passed in executionID and eventID.
|
||||
// Intermediate results during the resuming run are emitted using the returned StreamReader.
|
||||
// Caller is expected to poll the execution status using the GetExecution method.
|
||||
|
||||
@ -310,7 +310,6 @@ require (
|
||||
github.com/mtibben/percent v0.2.1 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
|
||||
golang.org/x/term v0.32.0 // indirect
|
||||
|
||||
@ -350,9 +350,9 @@ func (mr *MockServiceMockRecorder) GetConvRelatedInfo(ctx, convID any) *gomock.C
|
||||
}
|
||||
|
||||
// GetConversationNameByID mocks base method.
|
||||
func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (string, bool, error) {
|
||||
func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (string, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetConversationNameByID", ctx, env, appID, connectorID, conversationID)
|
||||
ret := m.ctrl.Call(m, "GetConversationNameByID", ctx, env, bizID, connectorID, conversationID)
|
||||
ret0, _ := ret[0].(string)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -360,9 +360,9 @@ func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, a
|
||||
}
|
||||
|
||||
// GetConversationNameByID indicates an expected call of GetConversationNameByID.
|
||||
func (mr *MockServiceMockRecorder) GetConversationNameByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
|
||||
func (mr *MockServiceMockRecorder) GetConversationNameByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConversationNameByID", reflect.TypeOf((*MockService)(nil).GetConversationNameByID), ctx, env, appID, connectorID, conversationID)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConversationNameByID", reflect.TypeOf((*MockService)(nil).GetConversationNameByID), ctx, env, bizID, connectorID, conversationID)
|
||||
}
|
||||
|
||||
// GetDynamicConversationByName mocks base method.
|
||||
@ -446,9 +446,9 @@ func (mr *MockServiceMockRecorder) GetNodeExecution(ctx, exeID, nodeID any) *gom
|
||||
}
|
||||
|
||||
// GetOrCreateConversation mocks base method.
|
||||
func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, int64, error) {
|
||||
func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, bizID, connectorID, userID int64, conversationName string) (int64, int64, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetOrCreateConversation", ctx, env, appID, connectorID, userID, conversationName)
|
||||
ret := m.ctrl.Call(m, "GetOrCreateConversation", ctx, env, bizID, connectorID, userID, conversationName)
|
||||
ret0, _ := ret[0].(int64)
|
||||
ret1, _ := ret[1].(int64)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -456,9 +456,9 @@ func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, a
|
||||
}
|
||||
|
||||
// GetOrCreateConversation indicates an expected call of GetOrCreateConversation.
|
||||
func (mr *MockServiceMockRecorder) GetOrCreateConversation(ctx, env, appID, connectorID, userID, conversationName any) *gomock.Call {
|
||||
func (mr *MockServiceMockRecorder) GetOrCreateConversation(ctx, env, bizID, connectorID, userID, conversationName any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateConversation", reflect.TypeOf((*MockService)(nil).GetOrCreateConversation), ctx, env, appID, connectorID, userID, conversationName)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateConversation", reflect.TypeOf((*MockService)(nil).GetOrCreateConversation), ctx, env, bizID, connectorID, userID, conversationName)
|
||||
}
|
||||
|
||||
// GetTemplateByName mocks base method.
|
||||
@ -774,6 +774,22 @@ func (mr *MockServiceMockRecorder) SyncRelatedWorkflowResources(ctx, appID, rela
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncRelatedWorkflowResources", reflect.TypeOf((*MockService)(nil).SyncRelatedWorkflowResources), ctx, appID, relatedWorkflows, related)
|
||||
}
|
||||
|
||||
// SyncResume mocks base method.
|
||||
func (m *MockService) SyncResume(ctx context.Context, req *entity.ResumeRequest, arg2 workflow.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncResume", ctx, req, arg2)
|
||||
ret0, _ := ret[0].(*entity.WorkflowExecution)
|
||||
ret1, _ := ret[1].(vo.TerminatePlan)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// SyncResume indicates an expected call of SyncResume.
|
||||
func (mr *MockServiceMockRecorder) SyncResume(ctx, req, arg2 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncResume", reflect.TypeOf((*MockService)(nil).SyncResume), ctx, req, arg2)
|
||||
}
|
||||
|
||||
// UpdateChatFlowRole mocks base method.
|
||||
func (m *MockService) UpdateChatFlowRole(ctx context.Context, workflowID int64, role *vo.ChatFlowRoleUpdate) error {
|
||||
m.ctrl.T.Helper()
|
||||
@ -1329,9 +1345,9 @@ func (mr *MockRepositoryMockRecorder) GetDraftWorkflowsByAppID(ctx, AppID any) *
|
||||
}
|
||||
|
||||
// GetDynamicConversationByID mocks base method.
|
||||
func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (*entity.DynamicConversation, bool, error) {
|
||||
func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (*entity.DynamicConversation, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetDynamicConversationByID", ctx, env, appID, connectorID, conversationID)
|
||||
ret := m.ctrl.Call(m, "GetDynamicConversationByID", ctx, env, bizID, connectorID, conversationID)
|
||||
ret0, _ := ret[0].(*entity.DynamicConversation)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -1339,9 +1355,9 @@ func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.
|
||||
}
|
||||
|
||||
// GetDynamicConversationByID indicates an expected call of GetDynamicConversationByID.
|
||||
func (mr *MockRepositoryMockRecorder) GetDynamicConversationByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
|
||||
func (mr *MockRepositoryMockRecorder) GetDynamicConversationByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDynamicConversationByID", reflect.TypeOf((*MockRepository)(nil).GetDynamicConversationByID), ctx, env, appID, connectorID, conversationID)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDynamicConversationByID", reflect.TypeOf((*MockRepository)(nil).GetDynamicConversationByID), ctx, env, bizID, connectorID, conversationID)
|
||||
}
|
||||
|
||||
// GetDynamicConversationByName mocks base method.
|
||||
@ -1565,9 +1581,9 @@ func (mr *MockRepositoryMockRecorder) GetOrCreateStaticConversation(ctx, env, id
|
||||
}
|
||||
|
||||
// GetStaticConversationByID mocks base method.
|
||||
func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (string, bool, error) {
|
||||
func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (string, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetStaticConversationByID", ctx, env, appID, connectorID, conversationID)
|
||||
ret := m.ctrl.Call(m, "GetStaticConversationByID", ctx, env, bizID, connectorID, conversationID)
|
||||
ret0, _ := ret[0].(string)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -1575,9 +1591,9 @@ func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.E
|
||||
}
|
||||
|
||||
// GetStaticConversationByID indicates an expected call of GetStaticConversationByID.
|
||||
func (mr *MockRepositoryMockRecorder) GetStaticConversationByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
|
||||
func (mr *MockRepositoryMockRecorder) GetStaticConversationByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaticConversationByID", reflect.TypeOf((*MockRepository)(nil).GetStaticConversationByID), ctx, env, appID, connectorID, conversationID)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaticConversationByID", reflect.TypeOf((*MockRepository)(nil).GetStaticConversationByID), ctx, env, bizID, connectorID, conversationID)
|
||||
}
|
||||
|
||||
// GetStaticConversationByTemplateID mocks base method.
|
||||
|
||||
Reference in New Issue
Block a user