feat: add sync resume for interrupt workflow

This commit is contained in:
zhuangjie.1125
2025-09-17 20:47:28 +08:00
parent b05004f188
commit b0c6dd2046
7 changed files with 665 additions and 100 deletions

View File

@ -1459,6 +1459,37 @@ func TestTestResumeWithInputNode(t *testing.T) {
resp := post[workflow.OpenAPIRunFlowResponse](r, syncRunReq)
assert.Equal(t, int64(errno.ErrOpenAPIInterruptNotSupported), resp.Code)
})
mockey.PatchConvey("test run, then sync resume", func() {
ctx := t.Context()
exeID := r.testRun(id, map[string]string{
"input": "unused initial input",
})
e := r.getProcess(id, exeID)
assert.NotNil(t, e.event) // interrupted
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(ctx, &entity.ResumeRequest{
ExecuteID: exeInt64ID,
EventID: eventInt64ID,
ResumeData: userInputStr,
}, workflowModel.ExecuteConfig{
Operator: 123,
Mode: workflowModel.ExecuteModeDebug,
BizType: workflowModel.BizTypeWorkflow,
Cancellable: true,
})
assert.NoError(t, err)
assert.Equal(t, map[string]any{
"input": "user input",
"inputArr": nil,
"field1": `["1","2"]`,
}, mustUnmarshalToMap(t, *result.Output))
})
})
}
@ -1823,64 +1854,173 @@ func TestInterruptWithinBatch(t *testing.T) {
r := newWfTestRunner(t)
defer r.closeFn()
id := r.load("batch/batch_with_inner_interrupt.json")
exeID := r.testRun(id, map[string]string{
"input_array": `["a","b"]`,
"batch_concurrency": "2",
mockey.PatchConvey("test run with async resume", func() {
id := r.load("batch/batch_with_inner_interrupt.json")
exeID := r.testRun(id, map[string]string{
"input_array": `["a","b"]`,
"batch_concurrency": "2",
})
e := r.getProcess(id, exeID)
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
exeIDInt, _ := strconv.ParseInt(exeID, 0, 64)
storeIEs, _ := workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e.event.ID, map[string]any{
"input": "input 1",
})
e2 := r.getProcess(id, exeID, withPreviousEventID(e.event.ID))
assert.Equal(t, workflow.EventType_InputNode, e2.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e2.event.ID, map[string]any{
"input": "input 2",
})
e3 := r.getProcess(id, exeID, withPreviousEventID(e2.event.ID))
assert.Equal(t, workflow.EventType_Question, e3.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e3.event.ID, "answer 1")
e4 := r.getProcess(id, exeID, withPreviousEventID(e3.event.ID))
assert.Equal(t, workflow.EventType_Question, e4.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 1, len(storeIEs))
r.testResume(id, exeID, e4.event.ID, "answer 2")
e5 := r.getProcess(id, exeID, withPreviousEventID(e4.event.ID))
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 0, len(storeIEs))
e5.assertSuccess()
outputMap := mustUnmarshalToMap(t, e5.output)
if !reflect.DeepEqual(outputMap, map[string]any{
"output": []any{"answer 1", "answer 2"},
}) && !reflect.DeepEqual(outputMap, map[string]any{
"output": []any{"answer 2", "answer 1"},
}) {
t.Errorf("output map not equal: %v", outputMap)
}
})
mockey.PatchConvey("test run with sync resume", func() {
id := r.load("batch/batch_with_inner_interrupt_for_debug_run.json")
exeID := r.testRun(id, map[string]string{
"input_array": `["a","b"]`,
"batch_concurrency": "2",
})
e := r.getProcess(id, exeID)
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
data := map[string]any{
"input": "123",
}
bs, _ := sonic.Marshal(data)
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
ExecuteID: exeInt64ID,
EventID: eventInt64ID,
ResumeData: string(bs),
}, workflowModel.ExecuteConfig{
Operator: 123,
Mode: workflowModel.ExecuteModeDebug,
BizType: workflowModel.BizTypeWorkflow,
Cancellable: true,
})
assert.NoError(t, err)
assert.Equal(t, result.Status, entity.WorkflowInterrupted)
assert.NotNil(t, result.InterruptEvents)
data = map[string]any{
"input": "456",
}
bs, _ = sonic.Marshal(data)
result, _, err = appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
ExecuteID: exeInt64ID,
EventID: result.InterruptEvents[0].ID,
ResumeData: string(bs),
}, workflowModel.ExecuteConfig{
Operator: 123,
Mode: workflowModel.ExecuteModeDebug,
BizType: workflowModel.BizTypeWorkflow,
Cancellable: true,
})
assert.NoError(t, err)
assert.Equal(t, map[string]any{
"output": []any{"123", "456"},
}, mustUnmarshalToMap(t, *result.Output))
})
mockey.PatchConvey("node debug run with sync resume", func() {
id := r.load("batch/batch_with_inner_interrupt_for_debug_run.json")
exeID := r.nodeDebug(id, "105709", withNDBatch(map[string]string{}))
e := r.getProcess(id, exeID)
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
data := map[string]any{
"input": "123",
}
bs, _ := sonic.Marshal(data)
exeInt64ID, _ := strconv.ParseInt(exeID, 10, 64)
eventInt64ID, _ := strconv.ParseInt(e.event.ID, 10, 64)
result, _, err := appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
ExecuteID: exeInt64ID,
EventID: eventInt64ID,
ResumeData: string(bs),
}, workflowModel.ExecuteConfig{
Operator: 123,
Mode: workflowModel.ExecuteModeNodeDebug,
BizType: workflowModel.BizTypeWorkflow,
Cancellable: true,
})
assert.NoError(t, err)
assert.Equal(t, result.Status, entity.WorkflowInterrupted)
assert.NotNil(t, result.InterruptEvents)
data = map[string]any{
"input": "456",
}
bs, _ = sonic.Marshal(data)
result, _, err = appworkflow.GetWorkflowDomainSVC().SyncResume(t.Context(), &entity.ResumeRequest{
ExecuteID: exeInt64ID,
EventID: result.InterruptEvents[0].ID,
ResumeData: string(bs),
}, workflowModel.ExecuteConfig{
Operator: 123,
Mode: workflowModel.ExecuteModeNodeDebug,
BizType: workflowModel.BizTypeWorkflow,
Cancellable: true,
})
assert.NoError(t, err)
assert.Equal(t, map[string]any{
"output": []any{"123", "456"},
}, mustUnmarshalToMap(t, *result.Output))
fmt.Println(result, err)
})
e := r.getProcess(id, exeID)
assert.Equal(t, workflow.EventType_InputNode, e.event.Type)
exeIDInt, _ := strconv.ParseInt(exeID, 0, 64)
storeIEs, _ := workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e.event.ID, map[string]any{
"input": "input 1",
})
e2 := r.getProcess(id, exeID, withPreviousEventID(e.event.ID))
assert.Equal(t, workflow.EventType_InputNode, e2.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e2.event.ID, map[string]any{
"input": "input 2",
})
e3 := r.getProcess(id, exeID, withPreviousEventID(e2.event.ID))
assert.Equal(t, workflow.EventType_Question, e3.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 2, len(storeIEs))
r.testResume(id, exeID, e3.event.ID, "answer 1")
e4 := r.getProcess(id, exeID, withPreviousEventID(e3.event.ID))
assert.Equal(t, workflow.EventType_Question, e4.event.Type)
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 1, len(storeIEs))
r.testResume(id, exeID, e4.event.ID, "answer 2")
e5 := r.getProcess(id, exeID, withPreviousEventID(e4.event.ID))
storeIEs, _ = workflow2.GetRepository().ListInterruptEvents(t.Context(), exeIDInt)
assert.Equal(t, 0, len(storeIEs))
e5.assertSuccess()
outputMap := mustUnmarshalToMap(t, e5.output)
if !reflect.DeepEqual(outputMap, map[string]any{
"output": []any{"answer 1", "answer 2"},
}) && !reflect.DeepEqual(outputMap, map[string]any{
"output": []any{"answer 2", "answer 1"},
}) {
t.Errorf("output map not equal: %v", outputMap)
}
})
}
@ -6292,3 +6432,5 @@ func TestChatFlowRun(t *testing.T) {
})
}
//

View File

@ -35,6 +35,7 @@ type Executable interface {
AsyncExecute(ctx context.Context, config workflowModel.ExecuteConfig, input map[string]any) (int64, error)
AsyncExecuteNode(ctx context.Context, nodeID string, config workflowModel.ExecuteConfig, input map[string]any) (int64, error)
AsyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) error
SyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error)
StreamExecute(ctx context.Context, config workflowModel.ExecuteConfig, input map[string]any) (*schema.StreamReader[*entity.Message], error)
StreamResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (
*schema.StreamReader[*entity.Message], error)

View File

@ -0,0 +1,209 @@
{
"nodes": [
{
"id": "100001",
"type": "1",
"meta": {
"position": {
"x": 180,
"y": 13.700000000000003
}
},
"data": {
"nodeMeta": {
"description": "工作流的起始节点,用于设定启动工作流需要的信息",
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-start.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=cc901caf2ac7105e181a549398596a3d77a45b1735d75082da7924bcd6ee4a56",
"subTitle": "",
"title": "开始"
},
"outputs": [
{
"type": "string",
"name": "input",
"required": false
}
],
"trigger_parameters": []
}
},
{
"id": "900001",
"type": "2",
"meta": {
"position": {
"x": 1300,
"y": 0.7000000000000028
}
},
"data": {
"nodeMeta": {
"description": "工作流的最终节点,用于返回工作流运行后的结果信息",
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-end.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=61259d932108c153d5470e7fe21984f0961007508b01b1e884250744ede25dde",
"subTitle": "",
"title": "结束"
},
"inputs": {
"terminatePlan": "returnVariables",
"inputParameters": [
{
"name": "output",
"input": {
"type": "list",
"schema": {
"type": "string"
},
"value": {
"type": "ref",
"content": {
"source": "block-output",
"blockID": "105709",
"name": "output"
},
"rawMeta": {
"type": 99
}
}
}
}
]
}
}
},
{
"id": "105709",
"type": "28",
"meta": {
"position": {
"x": 740,
"y": 0
},
"canvasPosition": {
"x": 560,
"y": 293.4
}
},
"data": {
"nodeMeta": {
"title": "批处理",
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-batch.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=ea39a384a6d4ddb2dfe873ab7277b996747e42e129da23367c62068bf8980068",
"description": "通过设定批量运行次数和逻辑,运行批处理体内的任务",
"mainColor": "#00B2B2",
"subTitle": "批处理"
},
"inputs": {
"concurrentSize": {
"type": "integer",
"value": {
"type": "literal",
"content": "1"
}
},
"batchSize": {
"type": "integer",
"value": {
"type": "literal",
"content": "100"
}
},
"inputParameters": [
{
"name": "input",
"input": {
"type": "list",
"value": {
"type": "literal",
"content": "[\"12\",\"2\"]",
"rawMeta": {
"type": 99
}
},
"schema": {
"type": "string"
}
}
}
]
},
"outputs": [
{
"name": "output",
"input": {
"type": "list",
"schema": {
"type": "string"
},
"value": {
"type": "ref",
"content": {
"source": "block-output",
"blockID": "136577",
"name": "input"
},
"rawMeta": {
"type": 1
}
}
}
}
]
},
"blocks": [
{
"id": "136577",
"type": "30",
"meta": {
"position": {
"x": 180,
"y": 0
}
},
"data": {
"outputs": [
{
"type": "string",
"name": "input",
"required": true
}
],
"nodeMeta": {
"title": "输入",
"icon": "http://10.37.46.247:9000/opencoze/default_icon/workflow_icon/icon-input.jpg?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250917%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250917T100753Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=d0af4e55a091037e82c025f29e925fbdb0592ca1319d03948a1833b5559ab2c4",
"description": "支持中间过程的信息输入",
"mainColor": "#5C62FF",
"subTitle": "输入"
},
"inputs": {
"outputSchema": "[{\"type\":\"string\",\"name\":\"input\",\"required\":true}]"
}
}
}
],
"edges": [
{
"sourceNodeID": "105709",
"targetNodeID": "136577",
"sourcePortID": "batch-function-inline-output"
},
{
"sourceNodeID": "136577",
"targetNodeID": "105709",
"targetPortID": "batch-function-inline-input"
}
]
}
],
"edges": [
{
"sourceNodeID": "100001",
"targetNodeID": "105709"
},
{
"sourceNodeID": "105709",
"targetNodeID": "900001",
"sourcePortID": "batch-output"
}
],
"versions": {
"loop": "v2"
}
}

View File

@ -315,44 +315,42 @@ func handleEvent(ctx context.Context, event *Event, repo workflow.Repository,
}
// TODO: there maybe time gap here
if err := repo.SaveInterruptEvents(ctx, event.RootExecuteID, event.InterruptEvents); err != nil {
return noTerminate, fmt.Errorf("failed to save interrupt events: %v", err)
}
if sw != nil && event.SubWorkflowCtx == nil { // only send interrupt event when is root workflow
if event.SubWorkflowCtx == nil {
firstIE, found, err := repo.GetFirstInterruptEvent(ctx, event.RootExecuteID)
if err != nil {
return noTerminate, fmt.Errorf("failed to get first interrupt event: %v", err)
}
if !found {
return noTerminate, fmt.Errorf("interrupt event does not exist, wfExeID: %d", event.RootExecuteID)
}
nodeKey := firstIE.NodeKey
sw.Send(&entity.Message{
DataMessage: &entity.DataMessage{
ExecuteID: event.RootExecuteID,
Role: schema.Assistant,
Type: entity.Answer,
Content: firstIE.InterruptData, // TODO: may need to extract from InterruptData the actual info for user
NodeID: string(nodeKey),
NodeType: firstIE.NodeType,
NodeTitle: firstIE.NodeTitle,
Last: true,
},
}, nil)
sw.Send(&entity.Message{
StateMessage: &entity.StateMessage{
ExecuteID: event.RootExecuteID,
EventID: event.GetResumedEventID(),
Status: entity.WorkflowInterrupted,
InterruptEvent: firstIE,
},
}, nil)
event.InterruptEvents = []*entity.InterruptEvent{firstIE}
if sw != nil { // only send interrupt event when is root workflow
nodeKey := firstIE.NodeKey
sw.Send(&entity.Message{
DataMessage: &entity.DataMessage{
ExecuteID: event.RootExecuteID,
Role: schema.Assistant,
Type: entity.Answer,
Content: firstIE.InterruptData, // TODO: may need to extract from InterruptData the actual info for user
NodeID: string(nodeKey),
NodeType: firstIE.NodeType,
NodeTitle: firstIE.NodeTitle,
Last: true,
},
}, nil)
sw.Send(&entity.Message{
StateMessage: &entity.StateMessage{
ExecuteID: event.RootExecuteID,
EventID: event.GetResumedEventID(),
Status: entity.WorkflowInterrupted,
InterruptEvent: firstIE,
},
}, nil)
}
}
return workflowAbort, nil

View File

@ -37,6 +37,7 @@ import (
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/compose"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
wfschema "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
@ -906,6 +907,205 @@ func (i *impl) AsyncResume(ctx context.Context, req *entity.ResumeRequest, confi
return nil
}
func (i *impl) SyncResume(ctx context.Context, req *entity.ResumeRequest, config workflowModel.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error) {
var err error
wfExe, found, err := i.repo.GetWorkflowExecution(ctx, req.ExecuteID)
if err != nil {
return nil, "", err
}
if !found {
return nil, "", fmt.Errorf("workflow execution does not exist, id: %d", req.ExecuteID)
}
if wfExe.RootExecutionID != wfExe.ID {
return nil, "", fmt.Errorf("only root workflow can be resumed")
}
if wfExe.Status != entity.WorkflowInterrupted {
return nil, "", fmt.Errorf("workflow execution %d is not interrupted, status is %v, cannot resume", req.ExecuteID, wfExe.Status)
}
var from workflowModel.Locator
if wfExe.Version == "" {
from = workflowModel.FromDraft
} else {
from = workflowModel.FromSpecificVersion
}
wfEntity, err := i.Get(ctx, &vo.GetPolicy{
ID: wfExe.WorkflowID,
QType: from,
Version: wfExe.Version,
CommitID: wfExe.CommitID,
})
if err != nil {
return nil, "", err
}
var canvas vo.Canvas
err = sonic.UnmarshalString(wfEntity.Canvas, &canvas)
if err != nil {
return nil, "", err
}
config.From = from
config.Version = wfExe.Version
config.AppID = wfExe.AppID
config.AgentID = wfExe.AgentID
config.CommitID = wfExe.CommitID
config.WorkflowMode = wfEntity.Mode
if config.ConnectorID == 0 {
config.ConnectorID = wfExe.ConnectorID
}
var (
lastEventChan <-chan *execute.Event
startTime time.Time
out map[string]any
wf *compose.Workflow
cancelCtx context.Context
opts []einoCompose.Option
nodeCount int32
workflowSC *wfschema.WorkflowSchema
)
if wfExe.Mode == workflowModel.ExecuteModeNodeDebug {
var nodeExes []*entity.NodeExecution
nodeExes, err = i.repo.GetNodeExecutionsByWfExeID(ctx, wfExe.ID)
if err != nil {
return nil, "", err
}
if len(nodeExes) == 0 {
return nil, "", fmt.Errorf("during node debug resume, no node execution found for workflow execution %d", wfExe.ID)
}
var nodeID string
for _, ne := range nodeExes {
if ne.ParentNodeID == nil {
nodeID = ne.NodeID
break
}
}
workflowSC, err = adaptor.WorkflowSchemaFromNode(ctx, &canvas, nodeID)
if err != nil {
return nil, "", fmt.Errorf("failed to convert canvas to workflow schema: %w", err)
}
nodeCount = workflowSC.NodeCount()
wf, err = compose.NewWorkflowFromNode(ctx, workflowSC, vo.NodeKey(nodeID),
einoCompose.WithGraphName(fmt.Sprintf("%d", wfExe.WorkflowID)))
if err != nil {
return nil, "", fmt.Errorf("failed to create workflow: %w", err)
}
config.Mode = workflowModel.ExecuteModeNodeDebug
cancelCtx, _, opts, lastEventChan, err = compose.NewWorkflowRunner(
wfEntity.GetBasic(), workflowSC, config, compose.WithResumeReq(req)).Prepare(ctx)
if err != nil {
return nil, "", err
}
startTime = time.Now()
out, err = wf.SyncRun(cancelCtx, nil, opts...)
} else {
workflowSC, err = adaptor.CanvasToWorkflowSchema(ctx, &canvas)
if err != nil {
return nil, "", fmt.Errorf("failed to convert canvas to workflow schema: %w", err)
}
nodeCount = workflowSC.NodeCount()
var wfOpts []compose.WorkflowOption
wfOpts = append(wfOpts, compose.WithIDAsName(wfExe.WorkflowID))
if s := execute.GetStaticConfig(); s != nil && s.MaxNodeCountPerWorkflow > 0 {
wfOpts = append(wfOpts, compose.WithMaxNodeCount(s.MaxNodeCountPerWorkflow))
}
wf, err = compose.NewWorkflow(ctx, workflowSC, wfOpts...)
if err != nil {
return nil, "", fmt.Errorf("failed to create workflow: %w", err)
}
cancelCtx, _, opts, lastEventChan, err = compose.NewWorkflowRunner(
wfEntity.GetBasic(), workflowSC, config, compose.WithResumeReq(req)).Prepare(ctx)
if err != nil {
return nil, "", err
}
startTime = time.Now()
out, err = wf.SyncRun(cancelCtx, nil, opts...)
}
if err != nil {
if _, ok := einoCompose.ExtractInterruptInfo(err); !ok {
var wfe vo.WorkflowError
if errors.As(err, &wfe) {
return nil, "", wfe.AppendDebug(req.ExecuteID, wfEntity.SpaceID, wfEntity.ID)
} else {
return nil, "", vo.WrapWithDebug(errno.ErrWorkflowExecuteFail, err, req.ExecuteID, wfEntity.SpaceID, wfEntity.ID, errorx.KV("cause", err.Error()))
}
}
}
lastEvent := <-lastEventChan
updateTime := time.Now()
var outStr string
if wf.TerminatePlan() == vo.ReturnVariables {
outStr, err = sonic.MarshalString(out)
if err != nil {
return nil, "", err
}
} else {
outStr = out["output"].(string)
}
var status entity.WorkflowExecuteStatus
switch lastEvent.Type {
case execute.WorkflowSuccess:
status = entity.WorkflowSuccess
case execute.WorkflowInterrupt:
status = entity.WorkflowInterrupted
case execute.WorkflowFailed:
status = entity.WorkflowFailed
case execute.WorkflowCancel:
status = entity.WorkflowCancel
}
var failReason *string
if lastEvent.Err != nil {
failReason = ptr.Of(lastEvent.Err.Error())
}
return &entity.WorkflowExecution{
ID: req.ExecuteID,
WorkflowID: wfEntity.ID,
Version: wfEntity.GetVersion(),
SpaceID: wfEntity.SpaceID,
ExecuteConfig: config,
CreatedAt: startTime,
NodeCount: nodeCount,
Status: status,
Duration: lastEvent.Duration,
Input: ptr.Of(req.ResumeData),
Output: ptr.Of(outStr),
ErrorCode: ptr.Of("-1"),
FailReason: failReason,
TokenInfo: &entity.TokenUsage{
InputTokens: lastEvent.GetInputTokens(),
OutputTokens: lastEvent.GetOutputTokens(),
},
UpdatedAt: ptr.Of(updateTime),
RootExecutionID: req.ExecuteID,
InterruptEvents: lastEvent.InterruptEvents,
}, wf.TerminatePlan(), nil
}
// StreamResume resumes a workflow execution, using the passed in executionID and eventID.
// Intermediate results during the resuming run are emitted using the returned StreamReader.
// Caller is expected to poll the execution status using the GetExecution method.

View File

@ -310,7 +310,6 @@ require (
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
golang.org/x/term v0.32.0 // indirect

View File

@ -350,9 +350,9 @@ func (mr *MockServiceMockRecorder) GetConvRelatedInfo(ctx, convID any) *gomock.C
}
// GetConversationNameByID mocks base method.
func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (string, bool, error) {
func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (string, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetConversationNameByID", ctx, env, appID, connectorID, conversationID)
ret := m.ctrl.Call(m, "GetConversationNameByID", ctx, env, bizID, connectorID, conversationID)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
@ -360,9 +360,9 @@ func (m *MockService) GetConversationNameByID(ctx context.Context, env vo.Env, a
}
// GetConversationNameByID indicates an expected call of GetConversationNameByID.
func (mr *MockServiceMockRecorder) GetConversationNameByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
func (mr *MockServiceMockRecorder) GetConversationNameByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConversationNameByID", reflect.TypeOf((*MockService)(nil).GetConversationNameByID), ctx, env, appID, connectorID, conversationID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConversationNameByID", reflect.TypeOf((*MockService)(nil).GetConversationNameByID), ctx, env, bizID, connectorID, conversationID)
}
// GetDynamicConversationByName mocks base method.
@ -446,9 +446,9 @@ func (mr *MockServiceMockRecorder) GetNodeExecution(ctx, exeID, nodeID any) *gom
}
// GetOrCreateConversation mocks base method.
func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, int64, error) {
func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, bizID, connectorID, userID int64, conversationName string) (int64, int64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetOrCreateConversation", ctx, env, appID, connectorID, userID, conversationName)
ret := m.ctrl.Call(m, "GetOrCreateConversation", ctx, env, bizID, connectorID, userID, conversationName)
ret0, _ := ret[0].(int64)
ret1, _ := ret[1].(int64)
ret2, _ := ret[2].(error)
@ -456,9 +456,9 @@ func (m *MockService) GetOrCreateConversation(ctx context.Context, env vo.Env, a
}
// GetOrCreateConversation indicates an expected call of GetOrCreateConversation.
func (mr *MockServiceMockRecorder) GetOrCreateConversation(ctx, env, appID, connectorID, userID, conversationName any) *gomock.Call {
func (mr *MockServiceMockRecorder) GetOrCreateConversation(ctx, env, bizID, connectorID, userID, conversationName any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateConversation", reflect.TypeOf((*MockService)(nil).GetOrCreateConversation), ctx, env, appID, connectorID, userID, conversationName)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateConversation", reflect.TypeOf((*MockService)(nil).GetOrCreateConversation), ctx, env, bizID, connectorID, userID, conversationName)
}
// GetTemplateByName mocks base method.
@ -774,6 +774,22 @@ func (mr *MockServiceMockRecorder) SyncRelatedWorkflowResources(ctx, appID, rela
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncRelatedWorkflowResources", reflect.TypeOf((*MockService)(nil).SyncRelatedWorkflowResources), ctx, appID, relatedWorkflows, related)
}
// SyncResume mocks base method.
func (m *MockService) SyncResume(ctx context.Context, req *entity.ResumeRequest, arg2 workflow.ExecuteConfig) (*entity.WorkflowExecution, vo.TerminatePlan, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncResume", ctx, req, arg2)
ret0, _ := ret[0].(*entity.WorkflowExecution)
ret1, _ := ret[1].(vo.TerminatePlan)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// SyncResume indicates an expected call of SyncResume.
func (mr *MockServiceMockRecorder) SyncResume(ctx, req, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncResume", reflect.TypeOf((*MockService)(nil).SyncResume), ctx, req, arg2)
}
// UpdateChatFlowRole mocks base method.
func (m *MockService) UpdateChatFlowRole(ctx context.Context, workflowID int64, role *vo.ChatFlowRoleUpdate) error {
m.ctrl.T.Helper()
@ -1329,9 +1345,9 @@ func (mr *MockRepositoryMockRecorder) GetDraftWorkflowsByAppID(ctx, AppID any) *
}
// GetDynamicConversationByID mocks base method.
func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (*entity.DynamicConversation, bool, error) {
func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (*entity.DynamicConversation, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetDynamicConversationByID", ctx, env, appID, connectorID, conversationID)
ret := m.ctrl.Call(m, "GetDynamicConversationByID", ctx, env, bizID, connectorID, conversationID)
ret0, _ := ret[0].(*entity.DynamicConversation)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
@ -1339,9 +1355,9 @@ func (m *MockRepository) GetDynamicConversationByID(ctx context.Context, env vo.
}
// GetDynamicConversationByID indicates an expected call of GetDynamicConversationByID.
func (mr *MockRepositoryMockRecorder) GetDynamicConversationByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
func (mr *MockRepositoryMockRecorder) GetDynamicConversationByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDynamicConversationByID", reflect.TypeOf((*MockRepository)(nil).GetDynamicConversationByID), ctx, env, appID, connectorID, conversationID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDynamicConversationByID", reflect.TypeOf((*MockRepository)(nil).GetDynamicConversationByID), ctx, env, bizID, connectorID, conversationID)
}
// GetDynamicConversationByName mocks base method.
@ -1565,9 +1581,9 @@ func (mr *MockRepositoryMockRecorder) GetOrCreateStaticConversation(ctx, env, id
}
// GetStaticConversationByID mocks base method.
func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.Env, appID, connectorID, conversationID int64) (string, bool, error) {
func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.Env, bizID, connectorID, conversationID int64) (string, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStaticConversationByID", ctx, env, appID, connectorID, conversationID)
ret := m.ctrl.Call(m, "GetStaticConversationByID", ctx, env, bizID, connectorID, conversationID)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
@ -1575,9 +1591,9 @@ func (m *MockRepository) GetStaticConversationByID(ctx context.Context, env vo.E
}
// GetStaticConversationByID indicates an expected call of GetStaticConversationByID.
func (mr *MockRepositoryMockRecorder) GetStaticConversationByID(ctx, env, appID, connectorID, conversationID any) *gomock.Call {
func (mr *MockRepositoryMockRecorder) GetStaticConversationByID(ctx, env, bizID, connectorID, conversationID any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaticConversationByID", reflect.TypeOf((*MockRepository)(nil).GetStaticConversationByID), ctx, env, appID, connectorID, conversationID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaticConversationByID", reflect.TypeOf((*MockRepository)(nil).GetStaticConversationByID), ctx, env, bizID, connectorID, conversationID)
}
// GetStaticConversationByTemplateID mocks base method.