feat: allow more customizability for custom workflow node (#1930)

This commit is contained in:
shentongmartin
2025-09-03 15:01:41 +08:00
committed by GitHub
parent 8db69fd6a9
commit f7b5096d69
57 changed files with 1591 additions and 1557 deletions

View File

@ -719,12 +719,13 @@ func withSpecificNodeID(id string) func(options *getProcessOptions) {
}
type exeResult struct {
output string
status workflow.WorkflowExeStatus
event *workflow.NodeEvent
token *workflow.TokenAndCost
t *testing.T
reason string
output string
status workflow.WorkflowExeStatus
event *workflow.NodeEvent
token *workflow.TokenAndCost
t *testing.T
reason string
nodeResults []*workflow.NodeResult
}
func (e *exeResult) assertSuccess() {
@ -743,6 +744,26 @@ func (e *exeResult) tokenEqual(in, out int) {
assert.Equal(e.t, out, outputI)
}
func (e *exeResult) nodeResultHasResponseExtra(nodeID string, k string, v any) {
var n *workflow.NodeResult
for _, nr := range e.nodeResults {
if nr.NodeId == nodeID {
n = nr
break
}
}
if n == nil {
e.t.Fatal("node key: ", nodeID, "not found")
return
}
extra := mustUnmarshalToMap(e.t, n.Extra)
assert.NotEmpty(e.t, extra)
assert.Contains(e.t, extra, "response_extra")
assert.Equal(e.t, extra["response_extra"].(map[string]any)[k], v)
}
func (r *wfTestRunner) getProcess(id, exeID string, opts ...func(options *getProcessOptions)) *exeResult {
options := &getProcessOptions{}
for _, opt := range opts {
@ -756,6 +777,7 @@ func (r *wfTestRunner) getProcess(id, exeID string, opts ...func(options *getPro
var nodeType string
var token *workflow.TokenAndCost
var reason string
var nodeResults []*workflow.NodeResult
var count int
for {
if nodeEvent != nil {
@ -808,18 +830,22 @@ func (r *wfTestRunner) getProcess(id, exeID string, opts ...func(options *getPro
if nodeEvent != nil {
eventID = nodeEvent.ID
}
nodeResults = getProcessResp.Data.NodeResults
r.t.Logf("getProcess output= %s, status= %v, eventID= %s, nodeType= %s", output, workflowStatus, eventID, nodeType)
count++
}
return &exeResult{
output: output,
status: workflowStatus,
event: nodeEvent,
token: token,
t: r.t,
reason: reason,
output: output,
status: workflowStatus,
event: nodeEvent,
token: token,
t: r.t,
reason: reason,
nodeResults: nodeResults,
}
}
@ -1140,7 +1166,8 @@ func TestTestRunAndGetProcess(t *testing.T) {
mockey.PatchConvey("test run success, then cancel", func() {
exeID := r.testRun(id, input)
r.getProcess(id, exeID)
exeResult := r.getProcess(id, exeID)
exeResult.nodeResultHasResponseExtra(entity.ExitNodeKey, "terminal_plan", int64(2))
// cancel after success, nothing happens
r.cancel(id, exeID)
@ -1374,6 +1401,7 @@ func TestTestResumeWithInputNode(t *testing.T) {
result := r.getNodeExeHistory(id, exeID, "154951", nil)
assert.Equal(t, mustUnmarshalToMap(t, e2.output), mustUnmarshalToMap(t, result.Output))
assert.Equal(t, mustUnmarshalToMap(t, e2.output), mustUnmarshalToMap(t, result.Input))
})
mockey.PatchConvey("sync run does not support interrupt", func() {
@ -2589,6 +2617,7 @@ func TestAggregateStreamVariables(t *testing.T) {
e := r.getProcess(id, exeID)
e.assertSuccess()
assert.Equal(t, "I won't tell you.\nI won't tell you.\n{\"Group1\":\"I won't tell you.\",\"input\":\"I've got an important question\"}", e.output)
e.nodeResultHasResponseExtra(entity.ExitNodeKey, "terminal_plan", int64(2))
defer r.runServer()()