feat(backend):workflow support conversation manager & add conversation/message nodes

This commit is contained in:
zhuangjie.1125
2025-07-30 12:26:46 +08:00
parent 8ba5a552c2
commit 6ca8d53345
13 changed files with 3697 additions and 2 deletions

View File

@ -28,8 +28,6 @@ import (
"github.com/coze-dev/coze-studio/backend/api/model/crossdomain/message"
"github.com/coze-dev/coze-studio/backend/api/model/workflow"
"github.com/coze-dev/coze-studio/backend/application/base/ctxutil"
"github.com/coze-dev/coze-studio/backend/crossdomain/contract/crossconversation"
"github.com/coze-dev/coze-studio/backend/crossdomain/contract/crossmessage"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
@ -234,6 +232,10 @@ func defaultCard() *inputCard {
return card
}
func (w *ApplicationService) CreateApplicationConversationDef(ctx context.Context, req *workflow.CreateProjectConversationDefRequest) (resp *workflow.CreateProjectConversationDefResponse, err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
@ -1370,3 +1372,5 @@ func renderSelectOptionCardDSL(c string) (string, error) {
return rCardString, nil
}
=======
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)

View File

@ -28,6 +28,7 @@ import (
wfconversation "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/conversation"
"github.com/coze-dev/coze-studio/backend/crossdomain/impl/code"
wfconversation "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/conversation"
wfplugin "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/plugin"
wfsearch "github.com/coze-dev/coze-studio/backend/crossdomain/workflow/search"
"github.com/coze-dev/coze-studio/backend/crossdomain/workflow/variable"
@ -37,7 +38,10 @@ import (
plugin "github.com/coze-dev/coze-studio/backend/domain/plugin/service"
search "github.com/coze-dev/coze-studio/backend/domain/search/service"
"github.com/coze-dev/coze-studio/backend/domain/workflow"
<<<<<<< HEAD
crossconversation "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
=======
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
crossplugin "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/plugin"
crosssearch "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/search"
crossvariable "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/variable"
@ -49,6 +53,8 @@ import (
"github.com/coze-dev/coze-studio/backend/infra/contract/imagex"
"github.com/coze-dev/coze-studio/backend/infra/contract/storage"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
crossconversation "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
)
type ServiceComponents struct {

View File

@ -195,7 +195,11 @@ func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.C
if !req.IsSetProjectID() || mustParseInt64(req.GetProjectID()) == 0 || !createConversation {
conversationName = "Default"
}
<<<<<<< HEAD
wf.InitCanvasSchema = vo.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)
=======
wf.InitCanvasSchema = entity.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
}
id, err := GetWorkflowDomainSVC().Create(ctx, wf)
@ -3917,11 +3921,15 @@ func (w *ApplicationService) GetChatFlowRole(ctx context.Context, req *workflow.
var version string
if wf.Meta.AppID != nil {
<<<<<<< HEAD
if vl, err := GetWorkflowDomainSVC().GetWorkflowVersionsByConnector(ctx, mustParseInt64(req.GetConnectorID()), wf.ID, 1); err != nil {
return nil, err
} else if len(vl) > 0 {
version = vl[0]
}
=======
version = "" // TODO : search version from DB using AppID
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
}
role, err := GetWorkflowDomainSVC().GetChatFlowRole(ctx, mustParseInt64(req.WorkflowID), version)
@ -4029,6 +4037,7 @@ func (w *ApplicationService) convertChatFlowRole(ctx context.Context, role *enti
return res, nil
}
<<<<<<< HEAD
func (w *ApplicationService) OpenAPIGetWorkflowInfo(ctx context.Context, req *workflow.OpenAPIGetWorkflowInfoRequest) (
_ *workflow.OpenAPIGetWorkflowInfoResponse, err error) {
@ -4092,3 +4101,5 @@ func (w *ApplicationService) OpenAPIGetWorkflowInfo(ctx context.Context, req *wo
},
}, nil
}
=======
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)

View File

@ -99,3 +99,7 @@ func (i *impl) GetWorkflowIDsByAppID(ctx context.Context, appID int64) ([]int64,
return a.ID
}), err
}
func (i *impl) InitApplicationDefaultConversationTemplate(ctx context.Context, spaceID int64, appID int64, userID int64) error {
return i.DomainSVC.InitApplicationDefaultConversationTemplate(ctx, spaceID, appID, userID)
}

View File

@ -1,3 +1,4 @@
<<<<<<< HEAD
/*
* Copyright 2025 coze-dev Authors
*
@ -171,7 +172,11 @@ func convertMessage(msgs []*msgentity.Message) ([]*conversation.Message, error)
for _, m := range msgs {
msg := &conversation.Message{
ID: m.ID,
<<<<<<< HEAD
Role: m.Role,
=======
Role: string(m.Role),
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: string(m.ContentType)}
if m.MultiContent != nil {
@ -180,13 +185,22 @@ func convertMessage(msgs []*msgentity.Message) ([]*conversation.Message, error)
if c.FileData != nil {
for _, fd := range c.FileData {
mcs = append(mcs, &conversation.Content{
<<<<<<< HEAD
Type: c.Type,
Uri: ptr.Of(fd.URI),
=======
Type: string(c.Type),
Uri: ptr.Of(fd.Url),
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
})
}
} else {
mcs = append(mcs, &conversation.Content{
<<<<<<< HEAD
Type: c.Type,
=======
Type: string(c.Type),
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
Text: ptr.Of(c.Text),
})
}

View File

@ -1,3 +1,4 @@
<<<<<<< HEAD
/*
* Copyright 2025 coze-dev Authors
*
@ -14,12 +15,14 @@
* limitations under the License.
*/
package conversation
import (
"testing"
"github.com/cloudwego/eino/schema"
apimessage "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/message"
"github.com/coze-dev/coze-studio/backend/domain/conversation/message/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
@ -44,7 +47,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 1,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text",
MultiContent: []*apimessage.InputMetaData{
{
@ -60,7 +67,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 1,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text",
MultiContent: []*conversation.Content{
{Type: "text", Text: ptr.Of("hello")},
@ -76,14 +87,22 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 2,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "file",
MultiContent: []*apimessage.InputMetaData{
{
Type: "file",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "f_uri_1",
=======
Url: "f_uri_1",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -100,7 +119,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 2,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "file",
MultiContent: []*conversation.Content{
{Type: "file", Uri: ptr.Of("f_uri_1")},
@ -117,7 +140,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 3,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text_file",
MultiContent: []*apimessage.InputMetaData{
{
@ -128,7 +155,11 @@ func Test_convertMessage(t *testing.T) {
Type: "file",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "f_uri_2",
=======
Url: "f_uri_2",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -141,7 +172,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 3,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text_file",
MultiContent: []*conversation.Content{
{Type: "text", Text: ptr.Of("hello")},
@ -158,17 +193,28 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 4,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "file",
MultiContent: []*apimessage.InputMetaData{
{
Type: "file",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "f_uri_3",
},
{
URI: "f_uri_4",
=======
Url: "f_uri_3",
},
{
Url: "f_uri_4",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -185,7 +231,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 4,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "file",
MultiContent: []*conversation.Content{
{Type: "file", Uri: ptr.Of("f_uri_3")},
@ -203,7 +253,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 5,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text",
MultiContent: []*apimessage.InputMetaData{
{
@ -219,7 +273,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 5,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "text",
MultiContent: []*conversation.Content{
{Type: "text", Text: ptr.Of("")},
@ -235,14 +293,22 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 6,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "image",
MultiContent: []*apimessage.InputMetaData{
{
Type: "image",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "image_uri_5",
=======
Url: "image_uri_5",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -259,7 +325,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 6,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "image",
MultiContent: []*conversation.Content{
{Type: "image", Uri: ptr.Of("image_uri_5")},
@ -276,17 +346,28 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 7,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "image",
MultiContent: []*apimessage.InputMetaData{
{
Type: "image",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "file_id_6",
},
{
URI: "file_id_7",
=======
Url: "file_id_6",
},
{
Url: "file_id_7",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -303,7 +384,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 7,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "image",
MultiContent: []*conversation.Content{
{Type: "image", Uri: ptr.Of("file_id_6")},
@ -321,7 +406,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*entity.Message{
{
ID: 8,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "mix",
MultiContent: []*apimessage.InputMetaData{
{
@ -332,7 +421,11 @@ func Test_convertMessage(t *testing.T) {
Type: "image",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "file_id_8",
=======
Url: "file_id_8",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -340,7 +433,11 @@ func Test_convertMessage(t *testing.T) {
Type: "file",
FileData: []*apimessage.FileData{
{
<<<<<<< HEAD
URI: "file_id_9",
=======
Url: "file_id_9",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
},
},
},
@ -353,7 +450,11 @@ func Test_convertMessage(t *testing.T) {
Messages: []*conversation.Message{
{
ID: 8,
<<<<<<< HEAD
Role: schema.User,
=======
Role: "user",
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
ContentType: "mix",
MultiContent: []*conversation.Content{
{Type: "text", Text: ptr.Of("hello")},

View File

@ -25,6 +25,7 @@ import (
connectorModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/connector"
databaseModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/database"
knowledgeModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/knowledge"
crossconnector "github.com/coze-dev/coze-studio/backend/crossdomain/contract/connector"
crossdatabase "github.com/coze-dev/coze-studio/backend/crossdomain/contract/database"
crossknowledge "github.com/coze-dev/coze-studio/backend/crossdomain/contract/knowledge"

View File

@ -69,6 +69,7 @@ type ConversationService interface {
InitApplicationDefaultConversationTemplate(ctx context.Context, spaceID int64, appID int64, userID int64) error
GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
UpdateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
}
type InterruptEventStore interface {
@ -129,6 +130,9 @@ type ConversationRepository interface {
ListDynamicConversation(ctx context.Context, env vo.Env, policy *vo.ListConversationPolicy) ([]*entity.DynamicConversation, error)
BatchCreateOnlineConversationTemplate(ctx context.Context, templates []*entity.ConversationTemplate, version string) error
UpdateDynamicConversationNameByID(ctx context.Context, env vo.Env, templateID int64, name string) error
<<<<<<< HEAD
UpdateStaticConversation(ctx context.Context, env vo.Env, templateID int64, connectorID int64, userID int64, newConversationID int64) error
UpdateDynamicConversation(ctx context.Context, env vo.Env, conversationID, newConversationID int64) error
=======
>>>>>>> a86ea8d1 (feat(backend):workflow support conversation manager & add conversation/message nodes)
}

File diff suppressed because it is too large Load Diff

View File

@ -65,6 +65,7 @@ type Service interface {
SyncRelatedWorkflowResources(ctx context.Context, appID int64, relatedWorkflows map[int64]entity.IDVersionPair, related vo.ExternalResourceRelated) error
ConversationService
BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) error
GetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,646 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package compose
import (
"context"
"fmt"
"runtime/debug"
"github.com/cloudwego/eino/compose"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/model"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/batch"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/code"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/conversation"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/database"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/emitter"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/entry"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/httprequester"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/intentdetector"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/json"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/knowledge"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/llm"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/loop"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/plugin"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/qa"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/receiver"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/selector"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/subworkflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/textprocessor"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/variableaggregator"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/variableassigner"
"github.com/coze-dev/coze-studio/backend/pkg/ctxcache"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
"github.com/coze-dev/coze-studio/backend/pkg/safego"
"github.com/coze-dev/coze-studio/backend/types/errno"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/variable"
)
type NodeSchema struct {
Key vo.NodeKey `json:"key"`
Name string `json:"name"`
Type entity.NodeType `json:"type"`
// Configs are node specific configurations with pre-defined config key and config value.
// Will not participate in request-time field mapping, nor as node's static values.
// In a word, these Configs are INTERNAL to node's implementation, the workflow layer is not aware of them.
Configs any `json:"configs,omitempty"`
InputTypes map[string]*vo.TypeInfo `json:"input_types,omitempty"`
InputSources []*vo.FieldInfo `json:"input_sources,omitempty"`
OutputTypes map[string]*vo.TypeInfo `json:"output_types,omitempty"`
OutputSources []*vo.FieldInfo `json:"output_sources,omitempty"` // only applicable to composite nodes such as Batch or Loop
ExceptionConfigs *ExceptionConfig `json:"exception_configs,omitempty"` // generic configurations applicable to most nodes
StreamConfigs *StreamConfig `json:"stream_configs,omitempty"`
SubWorkflowBasic *entity.WorkflowBasic `json:"sub_workflow_basic,omitempty"`
SubWorkflowSchema *WorkflowSchema `json:"sub_workflow_schema,omitempty"`
Lambda *compose.Lambda // not serializable, used for internal test.
}
type ExceptionConfig struct {
TimeoutMS int64 `json:"timeout_ms,omitempty"` // timeout in milliseconds, 0 means no timeout
MaxRetry int64 `json:"max_retry,omitempty"` // max retry times, 0 means no retry
ProcessType *vo.ErrorProcessType `json:"process_type,omitempty"` // error process type, 0 means throw error
DataOnErr string `json:"data_on_err,omitempty"` // data to return when error, effective when ProcessType==Default occurs
}
type StreamConfig struct {
// whether this node has the ability to produce genuine streaming output.
// not include nodes that only passes stream down as they receives them
CanGeneratesStream bool `json:"can_generates_stream,omitempty"`
// whether this node prioritize streaming input over none-streaming input.
// not include nodes that can accept both and does not have preference.
RequireStreamingInput bool `json:"can_process_stream,omitempty"`
}
type Node struct {
Lambda *compose.Lambda
}
func (s *NodeSchema) New(ctx context.Context, inner compose.Runnable[map[string]any, map[string]any],
sc *WorkflowSchema, deps *dependencyInfo) (_ *Node, err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = safego.NewPanicErr(panicErr, debug.Stack())
}
if err != nil {
err = vo.WrapIfNeeded(errno.ErrCreateNodeFail, err, errorx.KV("node_name", s.Name), errorx.KV("cause", err.Error()))
}
}()
if m := entity.NodeMetaByNodeType(s.Type); m != nil && m.InputSourceAware {
if err = s.SetFullSources(sc.GetAllNodes(), deps); err != nil {
return nil, err
}
}
switch s.Type {
case entity.NodeTypeLambda:
if s.Lambda == nil {
return nil, fmt.Errorf("lambda is not defined for NodeTypeLambda")
}
return &Node{Lambda: s.Lambda}, nil
case entity.NodeTypeLLM:
conf, err := s.ToLLMConfig(ctx)
if err != nil {
return nil, err
}
l, err := llm.New(ctx, conf)
if err != nil {
return nil, err
}
return invokableStreamableNodeWO(s, l.Chat, l.ChatStream, withCallbackOutputConverter(l.ToCallbackOutput)), nil
case entity.NodeTypeSelector:
conf := s.ToSelectorConfig()
sl, err := selector.NewSelector(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, sl.Select, withCallbackInputConverter(s.toSelectorCallbackInput(sc)), withCallbackOutputConverter(sl.ToCallbackOutput)), nil
case entity.NodeTypeBatch:
if inner == nil {
return nil, fmt.Errorf("inner workflow must not be nil when creating batch node")
}
conf, err := s.ToBatchConfig(inner)
if err != nil {
return nil, err
}
b, err := batch.NewBatch(ctx, conf)
if err != nil {
return nil, err
}
return invokableNodeWO(s, b.Execute, withCallbackInputConverter(b.ToCallbackInput)), nil
case entity.NodeTypeVariableAggregator:
conf, err := s.ToVariableAggregatorConfig()
if err != nil {
return nil, err
}
va, err := variableaggregator.NewVariableAggregator(ctx, conf)
if err != nil {
return nil, err
}
return invokableTransformableNode(s, va.Invoke, va.Transform,
withCallbackInputConverter(va.ToCallbackInput),
withCallbackOutputConverter(va.ToCallbackOutput),
withInit(va.Init)), nil
case entity.NodeTypeTextProcessor:
conf, err := s.ToTextProcessorConfig()
if err != nil {
return nil, err
}
tp, err := textprocessor.NewTextProcessor(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, tp.Invoke), nil
case entity.NodeTypeHTTPRequester:
conf, err := s.ToHTTPRequesterConfig()
if err != nil {
return nil, err
}
hr, err := httprequester.NewHTTPRequester(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, hr.Invoke, withCallbackInputConverter(hr.ToCallbackInput), withCallbackOutputConverter(hr.ToCallbackOutput)), nil
case entity.NodeTypeContinue:
i := func(ctx context.Context, in map[string]any) (map[string]any, error) {
return map[string]any{}, nil
}
return invokableNode(s, i), nil
case entity.NodeTypeBreak:
b, err := loop.NewBreak(ctx, &nodes.ParentIntermediateStore{})
if err != nil {
return nil, err
}
return invokableNode(s, b.DoBreak), nil
case entity.NodeTypeVariableAssigner:
handler := variable.GetVariableHandler()
conf, err := s.ToVariableAssignerConfig(handler)
if err != nil {
return nil, err
}
va, err := variableassigner.NewVariableAssigner(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, va.Assign), nil
case entity.NodeTypeVariableAssignerWithinLoop:
conf, err := s.ToVariableAssignerInLoopConfig()
if err != nil {
return nil, err
}
va, err := variableassigner.NewVariableAssignerInLoop(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, va.Assign), nil
case entity.NodeTypeLoop:
conf, err := s.ToLoopConfig(inner)
if err != nil {
return nil, err
}
l, err := loop.NewLoop(ctx, conf)
if err != nil {
return nil, err
}
return invokableNodeWO(s, l.Execute, withCallbackInputConverter(l.ToCallbackInput)), nil
case entity.NodeTypeQuestionAnswer:
conf, err := s.ToQAConfig(ctx)
if err != nil {
return nil, err
}
qA, err := qa.NewQuestionAnswer(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, qA.Execute, withCallbackOutputConverter(qA.ToCallbackOutput)), nil
case entity.NodeTypeInputReceiver:
conf, err := s.ToInputReceiverConfig()
if err != nil {
return nil, err
}
inputR, err := receiver.New(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, inputR.Invoke, withCallbackOutputConverter(inputR.ToCallbackOutput)), nil
case entity.NodeTypeOutputEmitter:
conf, err := s.ToOutputEmitterConfig(sc)
if err != nil {
return nil, err
}
e, err := emitter.New(ctx, conf)
if err != nil {
return nil, err
}
return invokableTransformableNode(s, e.Emit, e.EmitStream), nil
case entity.NodeTypeEntry:
conf, err := s.ToEntryConfig(ctx)
if err != nil {
return nil, err
}
e, err := entry.NewEntry(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, e.Invoke), nil
case entity.NodeTypeExit:
terminalPlan := mustGetKey[vo.TerminatePlan]("TerminalPlan", s.Configs)
if terminalPlan == vo.ReturnVariables {
i := func(ctx context.Context, in map[string]any) (map[string]any, error) {
if in == nil {
return map[string]any{}, nil
}
return in, nil
}
return invokableNode(s, i), nil
}
conf, err := s.ToOutputEmitterConfig(sc)
if err != nil {
return nil, err
}
e, err := emitter.New(ctx, conf)
if err != nil {
return nil, err
}
return invokableTransformableNode(s, e.Emit, e.EmitStream), nil
case entity.NodeTypeDatabaseCustomSQL:
conf, err := s.ToDatabaseCustomSQLConfig()
if err != nil {
return nil, err
}
sqlER, err := database.NewCustomSQL(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, sqlER.Execute), nil
case entity.NodeTypeDatabaseQuery:
conf, err := s.ToDatabaseQueryConfig()
if err != nil {
return nil, err
}
query, err := database.NewQuery(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, query.Query, withCallbackInputConverter(query.ToCallbackInput)), nil
case entity.NodeTypeDatabaseInsert:
conf, err := s.ToDatabaseInsertConfig()
if err != nil {
return nil, err
}
insert, err := database.NewInsert(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, insert.Insert, withCallbackInputConverter(insert.ToCallbackInput)), nil
case entity.NodeTypeDatabaseUpdate:
conf, err := s.ToDatabaseUpdateConfig()
if err != nil {
return nil, err
}
update, err := database.NewUpdate(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, update.Update, withCallbackInputConverter(update.ToCallbackInput)), nil
case entity.NodeTypeDatabaseDelete:
conf, err := s.ToDatabaseDeleteConfig()
if err != nil {
return nil, err
}
del, err := database.NewDelete(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, del.Delete, withCallbackInputConverter(del.ToCallbackInput)), nil
case entity.NodeTypeKnowledgeIndexer:
conf, err := s.ToKnowledgeIndexerConfig()
if err != nil {
return nil, err
}
w, err := knowledge.NewKnowledgeIndexer(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, w.Store), nil
case entity.NodeTypeKnowledgeRetriever:
conf, err := s.ToKnowledgeRetrieveConfig()
if err != nil {
return nil, err
}
r, err := knowledge.NewKnowledgeRetrieve(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Retrieve), nil
case entity.NodeTypeKnowledgeDeleter:
conf, err := s.ToKnowledgeDeleterConfig()
if err != nil {
return nil, err
}
r, err := knowledge.NewKnowledgeDeleter(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Delete), nil
case entity.NodeTypeCodeRunner:
conf, err := s.ToCodeRunnerConfig()
if err != nil {
return nil, err
}
r, err := code.NewCodeRunner(ctx, conf)
if err != nil {
return nil, err
}
initFn := func(ctx context.Context) (context.Context, error) {
return ctxcache.Init(ctx), nil
}
return invokableNode(s, r.RunCode, withCallbackOutputConverter(r.ToCallbackOutput), withInit(initFn)), nil
case entity.NodeTypePlugin:
conf, err := s.ToPluginConfig()
if err != nil {
return nil, err
}
r, err := plugin.NewPlugin(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Invoke), nil
case entity.NodeTypeCreateConversation:
conf, err := s.ToCreateConversationConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewCreateConversation(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Create), nil
case entity.NodeTypeConversationUpdate:
r := conversation.NewUpdateConversation(ctx)
if err != nil {
return nil, err
}
return invokableNode(s, r.Update), nil
case entity.NodeTypeConversationList:
r, err := conversation.NewConversationList(ctx)
if err != nil {
return nil, err
}
return invokableNode(s, r.List), nil
case entity.NodeTypeConversationDelete:
r := conversation.NewDeleteConversation(ctx)
if err != nil {
return nil, err
}
return invokableNode(s, r.Delete), nil
case entity.NodeTypeCreateMessage:
conf, err := s.ToCreateMessageConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewCreateMessage(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Create), nil
case entity.NodeTypeClearConversationHistory:
cfg, err := s.ToClearConversationHistoryConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewClearConversationHistory(ctx, cfg)
if err != nil {
return nil, err
}
return invokableNode(s, r.Clear), nil
case entity.NodeTypeConversationHistory:
cfg, err := s.ToConversationHistoryConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewConversationHistory(ctx, cfg)
if err != nil {
return nil, err
}
return invokableNode(s, r.HistoryMessages), nil
case entity.NodeTypeMessageList:
conf, err := s.ToMessageListConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewMessageList(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.List), nil
case entity.NodeTypeDeleteMessage:
conf, err := s.ToDeleteMessageConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewDeleteMessage(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Delete), nil
case entity.NodeTypeEditMessage:
conf, err := s.ToEditMessageConfig()
if err != nil {
return nil, err
}
r, err := conversation.NewEditMessage(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Edit), nil
case entity.NodeTypeIntentDetector:
conf, err := s.ToIntentDetectorConfig(ctx)
if err != nil {
return nil, err
}
r, err := intentdetector.NewIntentDetector(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, r.Invoke), nil
case entity.NodeTypeSubWorkflow:
conf, err := s.ToSubWorkflowConfig(ctx, sc.requireCheckPoint)
if err != nil {
return nil, err
}
r, err := subworkflow.NewSubWorkflow(ctx, conf)
if err != nil {
return nil, err
}
return invokableStreamableNodeWO(s, r.Invoke, r.Stream), nil
case entity.NodeTypeJsonSerialization:
conf, err := s.ToJsonSerializationConfig()
if err != nil {
return nil, err
}
js, err := json.NewJsonSerializer(ctx, conf)
if err != nil {
return nil, err
}
return invokableNode(s, js.Invoke), nil
case entity.NodeTypeJsonDeserialization:
conf, err := s.ToJsonDeserializationConfig()
if err != nil {
return nil, err
}
jd, err := json.NewJsonDeserializer(ctx, conf)
if err != nil {
return nil, err
}
initFn := func(ctx context.Context) (context.Context, error) {
return ctxcache.Init(ctx), nil
}
return invokableNode(s, jd.Invoke, withCallbackOutputConverter(jd.ToCallbackOutput), withInit(initFn)), nil
default:
panic("not implemented")
}
}
func (s *NodeSchema) IsEnableUserQuery() bool {
if s == nil {
return false
}
if s.Type != entity.NodeTypeEntry {
return false
}
if len(s.OutputSources) == 0 {
return false
}
for _, source := range s.OutputSources {
fieldPath := source.Path
if len(fieldPath) == 1 && (fieldPath[0] == "BOT_USER_INPUT" || fieldPath[0] == "USER_INPUT") {
return true
}
}
return false
}
func (s *NodeSchema) IsEnableChatHistory() bool {
if s == nil {
return false
}
switch s.Type {
case entity.NodeTypeLLM:
llmParam := mustGetKey[*model.LLMParams]("LLMParams", s.Configs)
return llmParam.EnableChatHistory
case entity.NodeTypeIntentDetector:
llmParam := mustGetKey[*model.LLMParams]("LLMParams", s.Configs)
return llmParam.EnableChatHistory
default:
return false
}
}
func (s *NodeSchema) IsRefGlobalVariable() bool {
for _, source := range s.InputSources {
if source.IsRefGlobalVariable() {
return true
}
}
for _, source := range s.OutputSources {
if source.IsRefGlobalVariable() {
return true
}
}
return false
}
func (s *NodeSchema) requireCheckpoint() bool {
if s.Type == entity.NodeTypeQuestionAnswer || s.Type == entity.NodeTypeInputReceiver {
return true
}
if s.Type == entity.NodeTypeLLM {
fcParams := getKeyOrZero[*vo.FCParam]("FCParam", s.Configs)
if fcParams != nil && fcParams.WorkflowFCParam != nil {
return true
}
}
if s.Type == entity.NodeTypeSubWorkflow {
s.SubWorkflowSchema.Init()
if s.SubWorkflowSchema.requireCheckPoint {
return true
}
}
return false
}

View File

@ -0,0 +1,692 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package compose
import (
"context"
"errors"
"fmt"
"runtime/debug"
"strconv"
"time"
einomodel "github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
workflow3 "github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
workflow2 "github.com/coze-dev/coze-studio/backend/domain/workflow"
crosscode "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/code"
crossconversation "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
crossdatabase "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/database"
crossknowledge "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/knowledge"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/model"
crossplugin "github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/plugin"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/variable"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/batch"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/code"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/conversation"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/database"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/emitter"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/entry"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/httprequester"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/intentdetector"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/json"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/knowledge"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/llm"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/loop"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/plugin"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/qa"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/receiver"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/selector"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/subworkflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/textprocessor"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/variableaggregator"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/variableassigner"
"github.com/coze-dev/coze-studio/backend/infra/contract/coderunner"
"github.com/coze-dev/coze-studio/backend/infra/contract/modelmgr"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-studio/backend/pkg/safego"
)
func (s *NodeSchema) ToEntryConfig(_ context.Context) (*entry.Config, error) {
return &entry.Config{
DefaultValues: getKeyOrZero[map[string]any]("DefaultValues", s.Configs),
OutputTypes: s.OutputTypes,
}, nil
}
func (s *NodeSchema) ToLLMConfig(ctx context.Context) (*llm.Config, error) {
llmConf := &llm.Config{
SystemPrompt: getKeyOrZero[string]("SystemPrompt", s.Configs),
UserPrompt: getKeyOrZero[string]("UserPrompt", s.Configs),
OutputFormat: mustGetKey[llm.Format]("OutputFormat", s.Configs),
InputFields: s.InputTypes,
OutputFields: s.OutputTypes,
FullSources: getKeyOrZero[map[string]*nodes.SourceInfo]("FullSources", s.Configs),
}
llmParams := getKeyOrZero[*model.LLMParams]("LLMParams", s.Configs)
if llmParams == nil {
return nil, fmt.Errorf("llm node llmParams is required")
}
var (
err error
chatModel, fallbackM einomodel.BaseChatModel
info, fallbackI *modelmgr.Model
modelWithInfo llm.ModelWithInfo
)
chatModel, info, err = model.GetManager().GetModel(ctx, llmParams)
if err != nil {
return nil, err
}
metaConfigs := s.ExceptionConfigs
if metaConfigs != nil && metaConfigs.MaxRetry > 0 {
backupModelParams := getKeyOrZero[*model.LLMParams]("BackupLLMParams", s.Configs)
if backupModelParams != nil {
fallbackM, fallbackI, err = model.GetManager().GetModel(ctx, backupModelParams)
if err != nil {
return nil, err
}
}
}
if fallbackM == nil {
modelWithInfo = llm.NewModel(chatModel, info)
} else {
modelWithInfo = llm.NewModelWithFallback(chatModel, fallbackM, info, fallbackI)
}
llmConf.ChatModel = modelWithInfo
fcParams := getKeyOrZero[*vo.FCParam]("FCParam", s.Configs)
if fcParams != nil {
if fcParams.WorkflowFCParam != nil {
for _, wf := range fcParams.WorkflowFCParam.WorkflowList {
wfIDStr := wf.WorkflowID
wfID, err := strconv.ParseInt(wfIDStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid workflow id: %s", wfIDStr)
}
workflowToolConfig := vo.WorkflowToolConfig{}
if wf.FCSetting != nil {
workflowToolConfig.InputParametersConfig = wf.FCSetting.RequestParameters
workflowToolConfig.OutputParametersConfig = wf.FCSetting.ResponseParameters
}
locator := vo.FromDraft
if wf.WorkflowVersion != "" {
locator = vo.FromSpecificVersion
}
wfTool, err := workflow2.GetRepository().WorkflowAsTool(ctx, vo.GetPolicy{
ID: wfID,
QType: locator,
Version: wf.WorkflowVersion,
}, workflowToolConfig)
if err != nil {
return nil, err
}
llmConf.Tools = append(llmConf.Tools, wfTool)
if wfTool.TerminatePlan() == vo.UseAnswerContent {
if llmConf.ToolsReturnDirectly == nil {
llmConf.ToolsReturnDirectly = make(map[string]bool)
}
toolInfo, err := wfTool.Info(ctx)
if err != nil {
return nil, err
}
llmConf.ToolsReturnDirectly[toolInfo.Name] = true
}
}
}
if fcParams.PluginFCParam != nil {
pluginToolsInvokableReq := make(map[int64]*crossplugin.ToolsInvokableRequest)
for _, p := range fcParams.PluginFCParam.PluginList {
pid, err := strconv.ParseInt(p.PluginID, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid plugin id: %s", p.PluginID)
}
toolID, err := strconv.ParseInt(p.ApiId, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid plugin id: %s", p.PluginID)
}
var (
requestParameters []*workflow3.APIParameter
responseParameters []*workflow3.APIParameter
)
if p.FCSetting != nil {
requestParameters = p.FCSetting.RequestParameters
responseParameters = p.FCSetting.ResponseParameters
}
if req, ok := pluginToolsInvokableReq[pid]; ok {
req.ToolsInvokableInfo[toolID] = &crossplugin.ToolsInvokableInfo{
ToolID: toolID,
RequestAPIParametersConfig: requestParameters,
ResponseAPIParametersConfig: responseParameters,
}
} else {
pluginToolsInfoRequest := &crossplugin.ToolsInvokableRequest{
PluginEntity: crossplugin.Entity{
PluginID: pid,
PluginVersion: ptr.Of(p.PluginVersion),
},
ToolsInvokableInfo: map[int64]*crossplugin.ToolsInvokableInfo{
toolID: {
ToolID: toolID,
RequestAPIParametersConfig: requestParameters,
ResponseAPIParametersConfig: responseParameters,
},
},
IsDraft: p.IsDraft,
}
pluginToolsInvokableReq[pid] = pluginToolsInfoRequest
}
}
inInvokableTools := make([]tool.BaseTool, 0, len(fcParams.PluginFCParam.PluginList))
for _, req := range pluginToolsInvokableReq {
toolMap, err := crossplugin.GetPluginService().GetPluginInvokableTools(ctx, req)
if err != nil {
return nil, err
}
for _, t := range toolMap {
inInvokableTools = append(inInvokableTools, crossplugin.NewInvokableTool(t))
}
}
if len(inInvokableTools) > 0 {
llmConf.Tools = inInvokableTools
}
}
if fcParams.KnowledgeFCParam != nil && len(fcParams.KnowledgeFCParam.KnowledgeList) > 0 {
kwChatModel, err := knowledgeRecallChatModel(ctx)
if err != nil {
return nil, err
}
knowledgeOperator := crossknowledge.GetKnowledgeOperator()
setting := fcParams.KnowledgeFCParam.GlobalSetting
cfg := &llm.KnowledgeRecallConfig{
ChatModel: kwChatModel,
Retriever: knowledgeOperator,
}
searchType, err := totRetrievalSearchType(setting.SearchMode)
if err != nil {
return nil, err
}
cfg.RetrievalStrategy = &llm.RetrievalStrategy{
RetrievalStrategy: &crossknowledge.RetrievalStrategy{
TopK: ptr.Of(setting.TopK),
MinScore: ptr.Of(setting.MinScore),
SearchType: searchType,
EnableNL2SQL: setting.UseNL2SQL,
EnableQueryRewrite: setting.UseRewrite,
EnableRerank: setting.UseRerank,
},
NoReCallReplyMode: llm.NoReCallReplyMode(setting.NoRecallReplyMode),
NoReCallReplyCustomizePrompt: setting.NoRecallReplyCustomizePrompt,
}
knowledgeIDs := make([]int64, 0, len(fcParams.KnowledgeFCParam.KnowledgeList))
for _, kw := range fcParams.KnowledgeFCParam.KnowledgeList {
kid, err := strconv.ParseInt(kw.ID, 10, 64)
if err != nil {
return nil, err
}
knowledgeIDs = append(knowledgeIDs, kid)
}
detailResp, err := knowledgeOperator.ListKnowledgeDetail(ctx, &crossknowledge.ListKnowledgeDetailRequest{
KnowledgeIDs: knowledgeIDs,
})
if err != nil {
return nil, err
}
cfg.SelectedKnowledgeDetails = detailResp.KnowledgeDetails
llmConf.KnowledgeRecallConfig = cfg
}
}
return llmConf, nil
}
func (s *NodeSchema) ToSelectorConfig() *selector.Config {
return &selector.Config{
Clauses: mustGetKey[[]*selector.OneClauseSchema]("Clauses", s.Configs),
}
}
func (s *NodeSchema) SelectorInputConverter(in map[string]any) (out []selector.Operants, err error) {
conf := mustGetKey[[]*selector.OneClauseSchema]("Clauses", s.Configs)
for i, oneConf := range conf {
if oneConf.Single != nil {
left, ok := nodes.TakeMapValue(in, compose.FieldPath{strconv.Itoa(i), selector.LeftKey})
if !ok {
return nil, fmt.Errorf("failed to take left operant from input map: %v, clause index= %d", in, i)
}
right, ok := nodes.TakeMapValue(in, compose.FieldPath{strconv.Itoa(i), selector.RightKey})
if ok {
out = append(out, selector.Operants{Left: left, Right: right})
} else {
out = append(out, selector.Operants{Left: left})
}
} else if oneConf.Multi != nil {
multiClause := make([]*selector.Operants, 0)
for j := range oneConf.Multi.Clauses {
left, ok := nodes.TakeMapValue(in, compose.FieldPath{strconv.Itoa(i), strconv.Itoa(j), selector.LeftKey})
if !ok {
return nil, fmt.Errorf("failed to take left operant from input map: %v, clause index= %d, single clause index= %d", in, i, j)
}
right, ok := nodes.TakeMapValue(in, compose.FieldPath{strconv.Itoa(i), strconv.Itoa(j), selector.RightKey})
if ok {
multiClause = append(multiClause, &selector.Operants{Left: left, Right: right})
} else {
multiClause = append(multiClause, &selector.Operants{Left: left})
}
}
out = append(out, selector.Operants{Multi: multiClause})
} else {
return nil, fmt.Errorf("invalid clause config, both single and multi are nil: %v", oneConf)
}
}
return out, nil
}
func (s *NodeSchema) ToBatchConfig(inner compose.Runnable[map[string]any, map[string]any]) (*batch.Config, error) {
conf := &batch.Config{
BatchNodeKey: s.Key,
InnerWorkflow: inner,
Outputs: s.OutputSources,
}
for key, tInfo := range s.InputTypes {
if tInfo.Type != vo.DataTypeArray {
continue
}
conf.InputArrays = append(conf.InputArrays, key)
}
return conf, nil
}
func (s *NodeSchema) ToVariableAggregatorConfig() (*variableaggregator.Config, error) {
return &variableaggregator.Config{
MergeStrategy: s.Configs.(map[string]any)["MergeStrategy"].(variableaggregator.MergeStrategy),
GroupLen: s.Configs.(map[string]any)["GroupToLen"].(map[string]int),
FullSources: getKeyOrZero[map[string]*nodes.SourceInfo]("FullSources", s.Configs),
NodeKey: s.Key,
InputSources: s.InputSources,
GroupOrder: mustGetKey[[]string]("GroupOrder", s.Configs),
}, nil
}
func (s *NodeSchema) variableAggregatorInputConverter(in map[string]any) (converted map[string]map[int]any) {
converted = make(map[string]map[int]any)
for k, value := range in {
m, ok := value.(map[string]any)
if !ok {
panic(errors.New("value is not a map[string]any"))
}
converted[k] = make(map[int]any, len(m))
for i, sv := range m {
index, err := strconv.Atoi(i)
if err != nil {
panic(fmt.Errorf(" converting %s to int failed, err=%v", i, err))
}
converted[k][index] = sv
}
}
return converted
}
func (s *NodeSchema) variableAggregatorStreamInputConverter(in *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]map[int]any] {
converter := func(input map[string]any) (output map[string]map[int]any, err error) {
defer func() {
if r := recover(); r != nil {
err = safego.NewPanicErr(r, debug.Stack())
}
}()
return s.variableAggregatorInputConverter(input), nil
}
return schema.StreamReaderWithConvert(in, converter)
}
func (s *NodeSchema) ToTextProcessorConfig() (*textprocessor.Config, error) {
return &textprocessor.Config{
Type: s.Configs.(map[string]any)["Type"].(textprocessor.Type),
Tpl: getKeyOrZero[string]("Tpl", s.Configs.(map[string]any)),
ConcatChar: getKeyOrZero[string]("ConcatChar", s.Configs.(map[string]any)),
Separators: getKeyOrZero[[]string]("Separators", s.Configs.(map[string]any)),
FullSources: getKeyOrZero[map[string]*nodes.SourceInfo]("FullSources", s.Configs),
}, nil
}
func (s *NodeSchema) ToJsonSerializationConfig() (*json.SerializationConfig, error) {
return &json.SerializationConfig{
InputTypes: s.InputTypes,
}, nil
}
func (s *NodeSchema) ToJsonDeserializationConfig() (*json.DeserializationConfig, error) {
return &json.DeserializationConfig{
OutputFields: s.OutputTypes,
}, nil
}
func (s *NodeSchema) ToHTTPRequesterConfig() (*httprequester.Config, error) {
return &httprequester.Config{
URLConfig: mustGetKey[httprequester.URLConfig]("URLConfig", s.Configs),
AuthConfig: getKeyOrZero[*httprequester.AuthenticationConfig]("AuthConfig", s.Configs),
BodyConfig: mustGetKey[httprequester.BodyConfig]("BodyConfig", s.Configs),
Method: mustGetKey[string]("Method", s.Configs),
Timeout: mustGetKey[time.Duration]("Timeout", s.Configs),
RetryTimes: mustGetKey[uint64]("RetryTimes", s.Configs),
MD5FieldMapping: mustGetKey[httprequester.MD5FieldMapping]("MD5FieldMapping", s.Configs),
}, nil
}
func (s *NodeSchema) ToVariableAssignerConfig(handler *variable.Handler) (*variableassigner.Config, error) {
return &variableassigner.Config{
Pairs: s.Configs.([]*variableassigner.Pair),
Handler: handler,
}, nil
}
func (s *NodeSchema) ToVariableAssignerInLoopConfig() (*variableassigner.Config, error) {
return &variableassigner.Config{
Pairs: s.Configs.([]*variableassigner.Pair),
}, nil
}
func (s *NodeSchema) ToLoopConfig(inner compose.Runnable[map[string]any, map[string]any]) (*loop.Config, error) {
conf := &loop.Config{
LoopNodeKey: s.Key,
LoopType: mustGetKey[loop.Type]("LoopType", s.Configs),
IntermediateVars: getKeyOrZero[map[string]*vo.TypeInfo]("IntermediateVars", s.Configs),
Outputs: s.OutputSources,
Inner: inner,
}
for key, tInfo := range s.InputTypes {
if tInfo.Type != vo.DataTypeArray {
continue
}
if _, ok := conf.IntermediateVars[key]; ok { // exclude arrays in intermediate vars
continue
}
conf.InputArrays = append(conf.InputArrays, key)
}
return conf, nil
}
func (s *NodeSchema) ToQAConfig(ctx context.Context) (*qa.Config, error) {
conf := &qa.Config{
QuestionTpl: mustGetKey[string]("QuestionTpl", s.Configs),
AnswerType: mustGetKey[qa.AnswerType]("AnswerType", s.Configs),
ChoiceType: getKeyOrZero[qa.ChoiceType]("ChoiceType", s.Configs),
FixedChoices: getKeyOrZero[[]string]("FixedChoices", s.Configs),
ExtractFromAnswer: getKeyOrZero[bool]("ExtractFromAnswer", s.Configs),
MaxAnswerCount: getKeyOrZero[int]("MaxAnswerCount", s.Configs),
AdditionalSystemPromptTpl: getKeyOrZero[string]("AdditionalSystemPromptTpl", s.Configs),
OutputFields: s.OutputTypes,
NodeKey: s.Key,
}
llmParams := getKeyOrZero[*model.LLMParams]("LLMParams", s.Configs)
if llmParams != nil {
m, _, err := model.GetManager().GetModel(ctx, llmParams)
if err != nil {
return nil, err
}
conf.Model = m
}
return conf, nil
}
func (s *NodeSchema) ToInputReceiverConfig() (*receiver.Config, error) {
return &receiver.Config{
OutputTypes: s.OutputTypes,
NodeKey: s.Key,
OutputSchema: mustGetKey[string]("OutputSchema", s.Configs),
}, nil
}
func (s *NodeSchema) ToOutputEmitterConfig(sc *WorkflowSchema) (*emitter.Config, error) {
conf := &emitter.Config{
Template: getKeyOrZero[string]("Template", s.Configs),
FullSources: getKeyOrZero[map[string]*nodes.SourceInfo]("FullSources", s.Configs),
}
return conf, nil
}
func (s *NodeSchema) ToDatabaseCustomSQLConfig() (*database.CustomSQLConfig, error) {
return &database.CustomSQLConfig{
DatabaseInfoID: mustGetKey[int64]("DatabaseInfoID", s.Configs),
SQLTemplate: mustGetKey[string]("SQLTemplate", s.Configs),
OutputConfig: s.OutputTypes,
CustomSQLExecutor: crossdatabase.GetDatabaseOperator(),
}, nil
}
func (s *NodeSchema) ToDatabaseQueryConfig() (*database.QueryConfig, error) {
return &database.QueryConfig{
DatabaseInfoID: mustGetKey[int64]("DatabaseInfoID", s.Configs),
QueryFields: getKeyOrZero[[]string]("QueryFields", s.Configs),
OrderClauses: getKeyOrZero[[]*crossdatabase.OrderClause]("OrderClauses", s.Configs),
ClauseGroup: getKeyOrZero[*crossdatabase.ClauseGroup]("ClauseGroup", s.Configs),
OutputConfig: s.OutputTypes,
Limit: mustGetKey[int64]("Limit", s.Configs),
Op: crossdatabase.GetDatabaseOperator(),
}, nil
}
func (s *NodeSchema) ToDatabaseInsertConfig() (*database.InsertConfig, error) {
return &database.InsertConfig{
DatabaseInfoID: mustGetKey[int64]("DatabaseInfoID", s.Configs),
OutputConfig: s.OutputTypes,
Inserter: crossdatabase.GetDatabaseOperator(),
}, nil
}
func (s *NodeSchema) ToDatabaseDeleteConfig() (*database.DeleteConfig, error) {
return &database.DeleteConfig{
DatabaseInfoID: mustGetKey[int64]("DatabaseInfoID", s.Configs),
ClauseGroup: mustGetKey[*crossdatabase.ClauseGroup]("ClauseGroup", s.Configs),
OutputConfig: s.OutputTypes,
Deleter: crossdatabase.GetDatabaseOperator(),
}, nil
}
func (s *NodeSchema) ToDatabaseUpdateConfig() (*database.UpdateConfig, error) {
return &database.UpdateConfig{
DatabaseInfoID: mustGetKey[int64]("DatabaseInfoID", s.Configs),
ClauseGroup: mustGetKey[*crossdatabase.ClauseGroup]("ClauseGroup", s.Configs),
OutputConfig: s.OutputTypes,
Updater: crossdatabase.GetDatabaseOperator(),
}, nil
}
func (s *NodeSchema) ToKnowledgeIndexerConfig() (*knowledge.IndexerConfig, error) {
return &knowledge.IndexerConfig{
KnowledgeID: mustGetKey[int64]("KnowledgeID", s.Configs),
ParsingStrategy: mustGetKey[*crossknowledge.ParsingStrategy]("ParsingStrategy", s.Configs),
ChunkingStrategy: mustGetKey[*crossknowledge.ChunkingStrategy]("ChunkingStrategy", s.Configs),
KnowledgeIndexer: crossknowledge.GetKnowledgeOperator(),
}, nil
}
func (s *NodeSchema) ToKnowledgeRetrieveConfig() (*knowledge.RetrieveConfig, error) {
return &knowledge.RetrieveConfig{
KnowledgeIDs: mustGetKey[[]int64]("KnowledgeIDs", s.Configs),
RetrievalStrategy: mustGetKey[*crossknowledge.RetrievalStrategy]("RetrievalStrategy", s.Configs),
Retriever: crossknowledge.GetKnowledgeOperator(),
}, nil
}
func (s *NodeSchema) ToKnowledgeDeleterConfig() (*knowledge.DeleterConfig, error) {
return &knowledge.DeleterConfig{
KnowledgeID: mustGetKey[int64]("KnowledgeID", s.Configs),
KnowledgeDeleter: crossknowledge.GetKnowledgeOperator(),
}, nil
}
func (s *NodeSchema) ToPluginConfig() (*plugin.Config, error) {
return &plugin.Config{
PluginID: mustGetKey[int64]("PluginID", s.Configs),
ToolID: mustGetKey[int64]("ToolID", s.Configs),
PluginVersion: mustGetKey[string]("PluginVersion", s.Configs),
PluginService: crossplugin.GetPluginService(),
}, nil
}
func (s *NodeSchema) ToCodeRunnerConfig() (*code.Config, error) {
return &code.Config{
Code: mustGetKey[string]("Code", s.Configs),
Language: mustGetKey[coderunner.Language]("Language", s.Configs),
OutputConfig: s.OutputTypes,
Runner: crosscode.GetCodeRunner(),
}, nil
}
func (s *NodeSchema) ToCreateConversationConfig() (*conversation.CreateConversationConfig, error) {
return &conversation.CreateConversationConfig{
Manager: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToDeleteMessageConfig() (*conversation.DeleteMessageConfig, error) {
return &conversation.DeleteMessageConfig{
Manager: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToEditMessageConfig() (*conversation.EditMessageConfig, error) {
return &conversation.EditMessageConfig{
Manager: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToCreateMessageConfig() (*conversation.CreateMessageConfig, error) {
return &conversation.CreateMessageConfig{
Creator: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToMessageListConfig() (*conversation.MessageListConfig, error) {
return &conversation.MessageListConfig{
Lister: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToClearConversationHistoryConfig() (*conversation.ClearConversationHistoryConfig, error) {
return &conversation.ClearConversationHistoryConfig{
Manager: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToConversationHistoryConfig() (*conversation.ConversationHistoryConfig, error) {
return &conversation.ConversationHistoryConfig{
Manager: crossconversation.GetConversationManager(),
}, nil
}
func (s *NodeSchema) ToIntentDetectorConfig(ctx context.Context) (*intentdetector.Config, error) {
cfg := &intentdetector.Config{
Intents: mustGetKey[[]string]("Intents", s.Configs),
SystemPrompt: getKeyOrZero[string]("SystemPrompt", s.Configs),
IsFastMode: getKeyOrZero[bool]("IsFastMode", s.Configs),
}
llmParams := mustGetKey[*model.LLMParams]("LLMParams", s.Configs)
m, _, err := model.GetManager().GetModel(ctx, llmParams)
if err != nil {
return nil, err
}
cfg.ChatModel = m
return cfg, nil
}
func (s *NodeSchema) ToSubWorkflowConfig(ctx context.Context, requireCheckpoint bool) (*subworkflow.Config, error) {
var opts []WorkflowOption
opts = append(opts, WithIDAsName(mustGetKey[int64]("WorkflowID", s.Configs)))
if requireCheckpoint {
opts = append(opts, WithParentRequireCheckpoint())
}
if s := execute.GetStaticConfig(); s != nil && s.MaxNodeCountPerWorkflow > 0 {
opts = append(opts, WithMaxNodeCount(s.MaxNodeCountPerWorkflow))
}
wf, err := NewWorkflow(ctx, s.SubWorkflowSchema, opts...)
if err != nil {
return nil, err
}
return &subworkflow.Config{
Runner: wf.Runner,
}, nil
}
func totRetrievalSearchType(s int64) (crossknowledge.SearchType, error) {
switch s {
case 0:
return crossknowledge.SearchTypeSemantic, nil
case 1:
return crossknowledge.SearchTypeHybrid, nil
case 20:
return crossknowledge.SearchTypeFullText, nil
default:
return "", fmt.Errorf("invalid retrieval search type %v", s)
}
}
// knowledgeRecallChatModel the chat model used by the knowledge base recall in the LLM node, not the user-configured model
func knowledgeRecallChatModel(ctx context.Context) (einomodel.BaseChatModel, error) {
defaultChatModelParma := &model.LLMParams{
ModelName: "豆包·1.5·Pro·32k",
ModelType: 1,
Temperature: ptr.Of(0.5),
MaxTokens: 4096,
}
m, _, err := model.GetManager().GetModel(ctx, defaultChatModelParma)
return m, err
}