Compare commits

..

1 Commits

Author SHA1 Message Date
e505507cb1 feat(backend): add license 2025-07-31 15:42:49 +08:00
25 changed files with 2785 additions and 14 deletions

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/hertz/pkg/app"
@ -1094,9 +1095,62 @@ func OpenAPIChatFlowRun(ctx context.Context, c *app.RequestContext) {
return
}
resp := new(workflow.ChatFlowRunResponse)
w := sse.NewWriter(c)
c.SetContentType("text/event-stream; charset=utf-8")
c.Response.Header.Set("Cache-Control", "no-cache")
c.Response.Header.Set("Connection", "keep-alive")
c.Response.Header.Set("Access-Control-Allow-Origin", "*")
c.JSON(consts.StatusOK, resp)
sr, err := appworkflow.SVC.OpenAPIChatFlowRun(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
sendChatFlowStreamRunSSE(ctx, w, sr)
}
func sendChatFlowStreamRunSSE(ctx context.Context, w *sse.Writer, sr *schema.StreamReader[[]*workflow.ChatFlowRunResponse]) {
defer func() {
_ = w.Close()
sr.Close()
}()
seq := int64(1)
for {
respList, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
event := &sse.Event{
Type: "error",
Data: []byte(err.Error()),
}
if err = w.Write(event); err != nil {
logs.CtxErrorf(ctx, "publish stream event failed, err:%v", err)
}
return
}
for _, resp := range respList {
event := &sse.Event{
ID: strconv.FormatInt(seq, 10),
Type: resp.Event,
Data: []byte(resp.Data),
}
if err = w.Write(event); err != nil {
logs.CtxErrorf(ctx, "publish stream event failed, err:%v", err)
return
}
seq++
}
}
}
// OpenAPIGetWorkflowInfo .
@ -1110,7 +1164,11 @@ func OpenAPIGetWorkflowInfo(ctx context.Context, c *app.RequestContext) {
return
}
resp := new(workflow.OpenAPIGetWorkflowInfoResponse)
resp, err := appworkflow.SVC.OpenAPIGetWorkflowInfo(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
c.JSON(consts.StatusOK, resp)
}
@ -1154,3 +1212,23 @@ func GetExampleWorkFlowList(ctx context.Context, c *app.RequestContext) {
c.JSON(consts.StatusOK, resp)
}
// GetOrCreateConversation .
// @router /api/workflow_api/conversation/create [POST]
func GetOrCreateConversation(ctx context.Context, c *app.RequestContext) {
var err error
var req workflow.GetOrCreateConversationRequest
err = c.BindAndValidate(&req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
resp, err := appworkflow.SVC.GetOrCreateConversation(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
c.JSON(consts.StatusOK, resp)
}

View File

@ -52,6 +52,8 @@ func (p Scene) String() string {
return "GenerateAgentInfo"
case Scene_SceneOpenApi:
return "SceneOpenApi"
case Scene_SceneWorkflow:
return "SceneWorkflow"
}
return "<UNSET>"
}
@ -78,6 +80,8 @@ func SceneFromString(s string) (Scene, error) {
return Scene_GenerateAgentInfo, nil
case "SceneOpenApi":
return Scene_SceneOpenApi, nil
case "SceneWorkflow":
return Scene_SceneWorkflow, nil
}
return Scene(0), fmt.Errorf("not a valid Scene string")
}

View File

@ -3,11 +3,11 @@
package dataset
import (
"github.com/coze-dev/coze-studio/backend/api/model/base"
"database/sql"
"database/sql/driver"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
"github.com/coze-dev/coze-studio/backend/api/model/base"
)
type ColumnType int64

View File

@ -92,6 +92,8 @@ type WorkflowService interface {
CreateChatFlowRole(ctx context.Context, request *CreateChatFlowRoleRequest) (r *CreateChatFlowRoleResponse, err error)
DeleteChatFlowRole(ctx context.Context, request *DeleteChatFlowRoleRequest) (r *DeleteChatFlowRoleResponse, err error)
GetOrCreateConversation(ctx context.Context, request *GetOrCreateConversationRequest) (r *GetOrCreateConversationResponse, err error)
// App 发布管理
ListPublishWorkflow(ctx context.Context, request *ListPublishWorkflowRequest) (r *ListPublishWorkflowResponse, err error)
// Open API
@ -503,6 +505,15 @@ func (p *WorkflowServiceClient) DeleteChatFlowRole(ctx context.Context, request
}
return _result.GetSuccess(), nil
}
func (p *WorkflowServiceClient) GetOrCreateConversation(ctx context.Context, request *GetOrCreateConversationRequest) (r *GetOrCreateConversationResponse, err error) {
var _args WorkflowServiceGetOrCreateConversationArgs
_args.Request = request
var _result WorkflowServiceGetOrCreateConversationResult
if err = p.Client_().Call(ctx, "GetOrCreateConversation", &_args, &_result); err != nil {
return
}
return _result.GetSuccess(), nil
}
func (p *WorkflowServiceClient) ListPublishWorkflow(ctx context.Context, request *ListPublishWorkflowRequest) (r *ListPublishWorkflowResponse, err error) {
var _args WorkflowServiceListPublishWorkflowArgs
_args.Request = request
@ -628,6 +639,7 @@ func NewWorkflowServiceProcessor(handler WorkflowService) *WorkflowServiceProces
self.AddToProcessorMap("GetChatFlowRole", &workflowServiceProcessorGetChatFlowRole{handler: handler})
self.AddToProcessorMap("CreateChatFlowRole", &workflowServiceProcessorCreateChatFlowRole{handler: handler})
self.AddToProcessorMap("DeleteChatFlowRole", &workflowServiceProcessorDeleteChatFlowRole{handler: handler})
self.AddToProcessorMap("GetOrCreateConversation", &workflowServiceProcessorGetOrCreateConversation{handler: handler})
self.AddToProcessorMap("ListPublishWorkflow", &workflowServiceProcessorListPublishWorkflow{handler: handler})
self.AddToProcessorMap("OpenAPIRunFlow", &workflowServiceProcessorOpenAPIRunFlow{handler: handler})
self.AddToProcessorMap("OpenAPIStreamRunFlow", &workflowServiceProcessorOpenAPIStreamRunFlow{handler: handler})
@ -2623,6 +2635,54 @@ func (p *workflowServiceProcessorDeleteChatFlowRole) Process(ctx context.Context
return true, err
}
type workflowServiceProcessorGetOrCreateConversation struct {
handler WorkflowService
}
func (p *workflowServiceProcessorGetOrCreateConversation) Process(ctx context.Context, seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := WorkflowServiceGetOrCreateConversationArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("GetOrCreateConversation", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return false, err
}
iprot.ReadMessageEnd()
var err2 error
result := WorkflowServiceGetOrCreateConversationResult{}
var retval *GetOrCreateConversationResponse
if retval, err2 = p.handler.GetOrCreateConversation(ctx, args.Request); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing GetOrCreateConversation: "+err2.Error())
oprot.WriteMessageBegin("GetOrCreateConversation", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("GetOrCreateConversation", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(ctx); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
type workflowServiceProcessorListPublishWorkflow struct {
handler WorkflowService
}
@ -14931,6 +14991,298 @@ func (p *WorkflowServiceDeleteChatFlowRoleResult) String() string {
}
type WorkflowServiceGetOrCreateConversationArgs struct {
Request *GetOrCreateConversationRequest `thrift:"request,1"`
}
func NewWorkflowServiceGetOrCreateConversationArgs() *WorkflowServiceGetOrCreateConversationArgs {
return &WorkflowServiceGetOrCreateConversationArgs{}
}
func (p *WorkflowServiceGetOrCreateConversationArgs) InitDefault() {
}
var WorkflowServiceGetOrCreateConversationArgs_Request_DEFAULT *GetOrCreateConversationRequest
func (p *WorkflowServiceGetOrCreateConversationArgs) GetRequest() (v *GetOrCreateConversationRequest) {
if !p.IsSetRequest() {
return WorkflowServiceGetOrCreateConversationArgs_Request_DEFAULT
}
return p.Request
}
var fieldIDToName_WorkflowServiceGetOrCreateConversationArgs = map[int16]string{
1: "request",
}
func (p *WorkflowServiceGetOrCreateConversationArgs) IsSetRequest() bool {
return p.Request != nil
}
func (p *WorkflowServiceGetOrCreateConversationArgs) Read(iprot thrift.TProtocol) (err error) {
var fieldTypeId thrift.TType
var fieldId int16
if _, err = iprot.ReadStructBegin(); err != nil {
goto ReadStructBeginError
}
for {
_, fieldTypeId, fieldId, err = iprot.ReadFieldBegin()
if err != nil {
goto ReadFieldBeginError
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 1:
if fieldTypeId == thrift.STRUCT {
if err = p.ReadField1(iprot); err != nil {
goto ReadFieldError
}
} else if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
default:
if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
}
if err = iprot.ReadFieldEnd(); err != nil {
goto ReadFieldEndError
}
}
if err = iprot.ReadStructEnd(); err != nil {
goto ReadStructEndError
}
return nil
ReadStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T read struct begin error: ", p), err)
ReadFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err)
ReadFieldError:
return thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_WorkflowServiceGetOrCreateConversationArgs[fieldId]), err)
SkipFieldError:
return thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err)
ReadFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T read field end error", p), err)
ReadStructEndError:
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationArgs) ReadField1(iprot thrift.TProtocol) error {
_field := NewGetOrCreateConversationRequest()
if err := _field.Read(iprot); err != nil {
return err
}
p.Request = _field
return nil
}
func (p *WorkflowServiceGetOrCreateConversationArgs) Write(oprot thrift.TProtocol) (err error) {
var fieldId int16
if err = oprot.WriteStructBegin("GetOrCreateConversation_args"); err != nil {
goto WriteStructBeginError
}
if p != nil {
if err = p.writeField1(oprot); err != nil {
fieldId = 1
goto WriteFieldError
}
}
if err = oprot.WriteFieldStop(); err != nil {
goto WriteFieldStopError
}
if err = oprot.WriteStructEnd(); err != nil {
goto WriteStructEndError
}
return nil
WriteStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
WriteFieldError:
return thrift.PrependError(fmt.Sprintf("%T write field %d error: ", p, fieldId), err)
WriteFieldStopError:
return thrift.PrependError(fmt.Sprintf("%T write field stop error: ", p), err)
WriteStructEndError:
return thrift.PrependError(fmt.Sprintf("%T write struct end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationArgs) writeField1(oprot thrift.TProtocol) (err error) {
if err = oprot.WriteFieldBegin("request", thrift.STRUCT, 1); err != nil {
goto WriteFieldBeginError
}
if err := p.Request.Write(oprot); err != nil {
return err
}
if err = oprot.WriteFieldEnd(); err != nil {
goto WriteFieldEndError
}
return nil
WriteFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T write field 1 begin error: ", p), err)
WriteFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T write field 1 end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationArgs) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("WorkflowServiceGetOrCreateConversationArgs(%+v)", *p)
}
type WorkflowServiceGetOrCreateConversationResult struct {
Success *GetOrCreateConversationResponse `thrift:"success,0,optional"`
}
func NewWorkflowServiceGetOrCreateConversationResult() *WorkflowServiceGetOrCreateConversationResult {
return &WorkflowServiceGetOrCreateConversationResult{}
}
func (p *WorkflowServiceGetOrCreateConversationResult) InitDefault() {
}
var WorkflowServiceGetOrCreateConversationResult_Success_DEFAULT *GetOrCreateConversationResponse
func (p *WorkflowServiceGetOrCreateConversationResult) GetSuccess() (v *GetOrCreateConversationResponse) {
if !p.IsSetSuccess() {
return WorkflowServiceGetOrCreateConversationResult_Success_DEFAULT
}
return p.Success
}
var fieldIDToName_WorkflowServiceGetOrCreateConversationResult = map[int16]string{
0: "success",
}
func (p *WorkflowServiceGetOrCreateConversationResult) IsSetSuccess() bool {
return p.Success != nil
}
func (p *WorkflowServiceGetOrCreateConversationResult) Read(iprot thrift.TProtocol) (err error) {
var fieldTypeId thrift.TType
var fieldId int16
if _, err = iprot.ReadStructBegin(); err != nil {
goto ReadStructBeginError
}
for {
_, fieldTypeId, fieldId, err = iprot.ReadFieldBegin()
if err != nil {
goto ReadFieldBeginError
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 0:
if fieldTypeId == thrift.STRUCT {
if err = p.ReadField0(iprot); err != nil {
goto ReadFieldError
}
} else if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
default:
if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
}
if err = iprot.ReadFieldEnd(); err != nil {
goto ReadFieldEndError
}
}
if err = iprot.ReadStructEnd(); err != nil {
goto ReadStructEndError
}
return nil
ReadStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T read struct begin error: ", p), err)
ReadFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err)
ReadFieldError:
return thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_WorkflowServiceGetOrCreateConversationResult[fieldId]), err)
SkipFieldError:
return thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err)
ReadFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T read field end error", p), err)
ReadStructEndError:
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationResult) ReadField0(iprot thrift.TProtocol) error {
_field := NewGetOrCreateConversationResponse()
if err := _field.Read(iprot); err != nil {
return err
}
p.Success = _field
return nil
}
func (p *WorkflowServiceGetOrCreateConversationResult) Write(oprot thrift.TProtocol) (err error) {
var fieldId int16
if err = oprot.WriteStructBegin("GetOrCreateConversation_result"); err != nil {
goto WriteStructBeginError
}
if p != nil {
if err = p.writeField0(oprot); err != nil {
fieldId = 0
goto WriteFieldError
}
}
if err = oprot.WriteFieldStop(); err != nil {
goto WriteFieldStopError
}
if err = oprot.WriteStructEnd(); err != nil {
goto WriteStructEndError
}
return nil
WriteStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
WriteFieldError:
return thrift.PrependError(fmt.Sprintf("%T write field %d error: ", p, fieldId), err)
WriteFieldStopError:
return thrift.PrependError(fmt.Sprintf("%T write field stop error: ", p), err)
WriteStructEndError:
return thrift.PrependError(fmt.Sprintf("%T write struct end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationResult) writeField0(oprot thrift.TProtocol) (err error) {
if p.IsSetSuccess() {
if err = oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil {
goto WriteFieldBeginError
}
if err := p.Success.Write(oprot); err != nil {
return err
}
if err = oprot.WriteFieldEnd(); err != nil {
goto WriteFieldEndError
}
}
return nil
WriteFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T write field 0 begin error: ", p), err)
WriteFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T write field 0 end error: ", p), err)
}
func (p *WorkflowServiceGetOrCreateConversationResult) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("WorkflowServiceGetOrCreateConversationResult(%+v)", *p)
}
type WorkflowServiceListPublishWorkflowArgs struct {
Request *ListPublishWorkflowRequest `thrift:"request,1"`
}

File diff suppressed because it is too large Load Diff

View File

@ -393,6 +393,10 @@ func Register(r *server.Hertz) {
_chat_flow_role.POST("/delete", append(_deletechatflowroleMw(), coze.DeleteChatFlowRole)...)
_chat_flow_role.GET("/get", append(_getchatflowroleMw(), coze.GetChatFlowRole)...)
}
{
_conversation0 := _workflow_api.Group("/conversation", _conversation0Mw()...)
_conversation0.POST("/create", append(_getorcreateconversationMw(), coze.GetOrCreateConversation)...)
}
{
_project_conversation := _workflow_api.Group("/project_conversation", _project_conversationMw()...)
_project_conversation.POST("/create", append(_createprojectconversationdefMw(), coze.CreateProjectConversationDef)...)
@ -414,10 +418,10 @@ func Register(r *server.Hertz) {
_bot0.GET("/get_online_info", append(_getbotonlineinfoMw(), coze.GetBotOnlineInfo)...)
}
{
_conversation0 := _v1.Group("/conversation", _conversation0Mw()...)
_conversation0.POST("/create", append(_createconversationMw(), coze.CreateConversation)...)
_conversation1 := _v1.Group("/conversation", _conversation1Mw()...)
_conversation1.POST("/create", append(_createconversationMw(), coze.CreateConversation)...)
{
_message := _conversation0.Group("/message", _messageMw()...)
_message := _conversation1.Group("/message", _messageMw()...)
_message.POST("/list", append(_getapimessagelistMw(), coze.GetApiMessageList)...)
}
}

View File

@ -1505,3 +1505,13 @@ func _upload1Mw() []app.HandlerFunc {
// your code...
return nil
}
func _getorcreateconversationMw() []app.HandlerFunc {
// your code...
return nil
}
func _conversation1Mw() []app.HandlerFunc {
// your code...
return nil
}

View File

@ -20,6 +20,7 @@ package router
import (
"context"
"net/http"
"os"
"path"
"strings"
@ -74,4 +75,17 @@ func staticFileRegister(r *server.Hertz) {
ctx.File(staticFile)
})
r.Handle(http.MethodPost, "api/permission_api/coze_web_app/impersonate_coze_user", func(c context.Context, ctx *app.RequestContext) {
ctx.JSON(200, map[string]any{
"code": 0,
"msg": "ok",
"data": map[string]interface{}{
"access_token": "pat_ab1212d883ba6e0a63c28dd47a6e13629f7d5885b78b55d933e9caf9aae737c2",
"expires_in": 1753998220,
"token_type": "Bearer",
},
})
})
}

View File

@ -18,14 +18,17 @@ package workflow
import (
"context"
"errors"
"fmt"
"github.com/coze-dev/coze-studio/backend/types/consts"
"runtime/debug"
"strconv"
"strings"
"github.com/cloudwego/eino/schema"
"github.com/coze-dev/coze-studio/backend/api/model/crossdomain/message"
"github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
"github.com/coze-dev/coze-studio/backend/application/base/ctxutil"
"github.com/coze-dev/coze-studio/backend/crossdomain/contract/crossmessage"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
@ -33,7 +36,11 @@ import (
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ternary"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
"github.com/coze-dev/coze-studio/backend/pkg/safego"
"github.com/coze-dev/coze-studio/backend/pkg/sonic"
"github.com/coze-dev/coze-studio/backend/pkg/taskgroup"
"github.com/coze-dev/coze-studio/backend/types/consts"
"github.com/coze-dev/coze-studio/backend/types/errno"
)
@ -262,3 +269,491 @@ func (w *ApplicationService) ListApplicationConversationDef(ctx context.Context,
return resp, nil
}
func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workflow.ChatFlowRunRequest) (
_ *schema.StreamReader[[]*workflow.ChatFlowRunResponse], err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = safego.NewPanicErr(panicErr, debug.Stack())
}
if err != nil {
err = vo.WrapIfNeeded(errno.ErrChatFlowRoleOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))
}
}()
if len(req.GetAdditionalMessages()) == 0 {
return nil, fmt.Errorf("additional_messages is requird")
}
messages := req.GetAdditionalMessages()
lastUserMessage := messages[len(req.GetAdditionalMessages())-1]
if lastUserMessage.Role != "user" {
return nil, errors.New("the role of the last day message must be user")
}
var parameters = make(map[string]any)
if len(req.GetParameters()) > 0 {
err := sonic.UnmarshalString(req.GetParameters(), parameters)
if err != nil {
return nil, err
}
}
var (
isDebug = req.GetExecuteMode() == "DEBUG"
appID, agentID *int64
resolveAppID int64
connectorID, userID, conversationID int64
version string
locator vo.Locator
)
if req.IsSetAppID() {
appID = ptr.Of(mustParseInt64(req.GetAppID()))
resolveAppID = mustParseInt64(req.GetAppID())
}
if req.IsSetBotID() {
agentID = ptr.Of(mustParseInt64(req.GetBotID()))
resolveAppID = mustParseInt64(req.GetBotID())
}
if appID != nil && agentID != nil {
return nil, errors.New("project_id and bot_id cannot be set at the same time")
}
if isDebug {
userID = ctxutil.MustGetUIDFromCtx(ctx)
connectorID = mustParseInt64(req.GetConnectorID())
locator = vo.FromDraft
} else {
apiKeyInfo := ctxutil.GetApiAuthFromCtx(ctx)
userID = apiKeyInfo.UserID
connectorID = apiKeyInfo.ConnectorID
meta, err := GetWorkflowDomainSVC().Get(ctx, &vo.GetPolicy{
ID: mustParseInt64(req.GetWorkflowID()),
MetaOnly: true,
})
if err != nil {
return nil, err
}
if meta.LatestPublishedVersion == nil {
return nil, vo.NewError(errno.ErrWorkflowNotPublished)
}
if req.IsSetVersion() {
version = req.GetVersion()
locator = vo.FromSpecificVersion
} else {
version = meta.GetLatestVersion()
locator = vo.FromLatestVersion
}
}
if req.IsSetConversationID() {
conversationID = mustParseInt64(req.GetConversationID())
} else {
conversationName, ok := parameters["CONVERSATION_NAME"].(string)
if !ok {
return nil, fmt.Errorf("conversation name is requried")
}
cID, err := GetWorkflowDomainSVC().GetOrCreateConversation(ctx, ternary.IFElse(isDebug, vo.Draft, vo.Online), resolveAppID, connectorID, userID, conversationName)
if err != nil {
return nil, err
}
conversationID = cID
}
roundID, err := w.IDGenerator.GenID(ctx)
if err != nil {
return nil, err
}
historyMessages, err := w.makeChatFlowHistoryMessages(ctx, resolveAppID, conversationID, userID, messages[:len(req.GetAdditionalMessages())-1])
if err != nil {
return nil, err
}
messageClient := crossmessage.DefaultSVC()
if len(historyMessages) > 0 {
g := taskgroup.NewTaskGroup(ctx, len(historyMessages))
for _, hm := range historyMessages {
hMsg := hm
g.Go(func() error {
_, err := messageClient.Create(ctx, hMsg)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
logs.CtxWarnf(ctx, "create history message failed, err=%v", err)
}
}
userMessage, err := toConversationMessage(ctx, resolveAppID, conversationID, userID, roundID, message.MessageTypeQuestion, lastUserMessage)
if err != nil {
return nil, err
}
_, err = messageClient.Create(ctx, userMessage)
if err != nil {
return nil, err
}
exeCfg := vo.ExecuteConfig{
ID: mustParseInt64(req.GetWorkflowID()),
From: locator,
Version: version,
Operator: userID,
Mode: ternary.IFElse(isDebug, vo.ExecuteModeDebug, vo.ExecuteModeRelease),
AppID: appID,
AgentID: agentID,
ConnectorID: connectorID,
ConnectorUID: strconv.FormatInt(userID, 10),
TaskType: vo.TaskTypeForeground,
SyncPattern: vo.SyncPatternStream,
InputFailFast: true,
BizType: vo.BizTypeWorkflow,
ConversationID: ptr.Of(conversationID),
RoundID: ptr.Of(roundID),
EnterMessage: lastUserMessage,
Cancellable: isDebug == true,
}
parameters["USER_INPUT"], err = w.makeChatFlowUserInput(ctx, lastUserMessage)
if err != nil {
return nil, err
}
sr, err := GetWorkflowDomainSVC().StreamExecute(ctx, exeCfg, parameters)
if err != nil {
return nil, err
}
return schema.StreamReaderWithConvert(sr, convertToChatFlowRunResponseList(ctx, resolveAppID, conversationID, roundID, mustParseInt64(req.GetWorkflowID()))), nil
}
func convertToChatFlowRunResponseList(ctx context.Context, appID int64, conversationID, roundID int64, workflowID int64) func(msg *entity.Message) (responses []*workflow.ChatFlowRunResponse, err error) {
var (
spaceID int64
executeID int64
hasFirstMessage = false
messageOutput string
messageID int64
outputCount int32
inputCount int32
)
var getOrUpdateMessage = func(msg string, role schema.RoleType) error {
entityMessage := &message.Message{
AgentID: appID,
RunID: roundID,
Content: msg,
ConversationID: conversationID,
ContentType: message.ContentTypeText,
Role: role,
MessageType: message.MessageTypeAnswer,
}
if hasFirstMessage {
entityMessage.ID = messageID
_, err := crossmessage.DefaultSVC().Edit(ctx, entityMessage)
if err != nil {
return err
}
} else {
m, err := crossmessage.DefaultSVC().Create(ctx, entityMessage)
if err != nil {
return err
}
messageID = m.ID
hasFirstMessage = true
}
return nil
}
return func(msg *entity.Message) (responses []*workflow.ChatFlowRunResponse, err error) {
if msg.StateMessage != nil {
if executeID > 0 && executeID != msg.StateMessage.ExecuteID {
return nil, schema.ErrNoValue
}
switch msg.StateMessage.Status {
case entity.WorkflowSuccess:
chatDoneEvent := &vo.ChatFlowDetail{
ID: strconv.FormatInt(roundID, 10),
ConversationID: strconv.FormatInt(conversationID, 10),
BotID: strconv.FormatInt(appID, 10),
Status: vo.Completed,
ExecuteID: strconv.FormatInt(executeID, 10),
Usage: &vo.Usage{
InputTokens: ptr.Of(inputCount),
OutputTokens: ptr.Of(outputCount),
TokenCount: ptr.Of(outputCount + inputCount),
},
}
data, err := sonic.MarshalString(chatDoneEvent)
if err != nil {
return nil, err
}
doneData, err := sonic.MarshalString(map[string]interface{}{
"debug_url": fmt.Sprintf(vo.DebugURLTpl, executeID, spaceID, workflowID),
})
if err != nil {
return nil, err
}
return []*workflow.ChatFlowRunResponse{
{
Event: string(vo.ChatFlowCompleted),
Data: data,
},
{
Event: string(vo.ChatFlowDone),
Data: doneData,
},
}, err
case entity.WorkflowFailed:
var wfe vo.WorkflowError
if !errors.As(msg.StateMessage.LastError, &wfe) {
panic("stream run last error is not a WorkflowError")
}
chatFailedEvent := &vo.ErrorDetail{
Code: strconv.Itoa(int(wfe.Code())),
Msg: wfe.Msg(),
DebugUrl: wfe.DebugURL(),
}
data, err := sonic.MarshalString(chatFailedEvent)
if err != nil {
return nil, err
}
return []*workflow.ChatFlowRunResponse{
{
Event: string(vo.ChatFlowError),
Data: data,
},
}, err
case entity.WorkflowCancel:
// do nothing
case entity.WorkflowInterrupted:
// interrupted
fmt.Println("workflow interrupted")
case entity.WorkflowRunning:
executeID = msg.StateMessage.ExecuteID
spaceID = msg.StateMessage.SpaceID
responses = make([]*workflow.ChatFlowRunResponse, 0)
chatEvent := &vo.ChatFlowDetail{
ID: strconv.FormatInt(roundID, 10),
ConversationID: strconv.FormatInt(conversationID, 10),
Status: vo.Created,
ExecuteID: strconv.FormatInt(executeID, 10),
}
data, err := sonic.MarshalString(chatEvent)
if err != nil {
return nil, err
}
responses = append(responses, &workflow.ChatFlowRunResponse{
Event: string(vo.ChatFlowCreated),
Data: data,
})
chatEvent.Status = vo.InProgress
data, err = sonic.MarshalString(chatEvent)
if err != nil {
return nil, err
}
responses = append(responses, &workflow.ChatFlowRunResponse{
Event: string(vo.ChatFlowInProgress),
Data: data,
})
return responses, nil
default:
return nil, schema.ErrNoValue
}
}
if msg.DataMessage != nil {
if msg.Type != entity.Answer {
return nil, schema.ErrNoValue
}
// stream run will skip all messages from workflow tools
if executeID > 0 && executeID != msg.DataMessage.ExecuteID {
return nil, schema.ErrNoValue
}
messageOutput += msg.Content
dataMessage := msg.DataMessage
if dataMessage.Usage != nil {
inputCount += int32(msg.DataMessage.Usage.InputTokens)
outputCount += int32(msg.DataMessage.Usage.OutputTokens)
}
err = getOrUpdateMessage(messageOutput, dataMessage.Role)
if err != nil {
return nil, err
}
messageEvent := &vo.MessageDetail{
ID: strconv.FormatInt(messageID, 10),
ChatID: strconv.FormatInt(roundID, 10),
ConversationID: strconv.FormatInt(conversationID, 10),
BotID: strconv.FormatInt(appID, 10),
Role: string(dataMessage.Role),
Type: string(dataMessage.Type),
Content: msg.Content,
ContentType: string(message.ContentTypeText),
}
data, err := sonic.MarshalString(messageEvent)
if err != nil {
return nil, err
}
return []*workflow.ChatFlowRunResponse{
{
Event: ternary.IFElse(msg.Last, string(vo.ChatFlowMessageCompleted), string(vo.ChatFlowMessageDelta)),
Data: data,
},
}, nil
}
return nil, err
}
}
func (w *ApplicationService) makeChatFlowUserInput(ctx context.Context, message *workflow.EnterMessage) (string, error) {
type content struct {
Type string `json:"type,omitempty"`
FileID *string `json:"file_id"`
Text *string `json:"text"`
}
if message.ContentType == "text" {
return message.Content, nil
} else if message.ContentType == "object_string" {
contents := make([]content, 0)
err := sonic.UnmarshalString(message.Content, &contents)
if err != nil {
return "", err
}
texts := make([]string, 0)
urls := make([]string, 0)
for _, ct := range contents {
if ct.Text != nil && len(*ct.Text) > 0 {
texts = append(texts, *ct.Text)
}
if ct.FileID != nil && len(*ct.FileID) > 0 {
u, err := w.ImageX.GetResourceURL(ctx, *ct.FileID)
if err != nil {
return "", err
}
urls = append(urls, u.URL)
}
}
return strings.Join(texts, ",") + strings.Join(urls, ","), nil
} else {
return "", fmt.Errorf("invalid message ccontent type %v", message.ContentType)
}
}
func (w *ApplicationService) makeChatFlowHistoryMessages(ctx context.Context, appID, conversationID int64, userID int64, messages []*workflow.EnterMessage) ([]*message.Message, error) {
var (
rID int64
err error
userRole = "user"
assistantRole = "assistant"
)
historyMessages := make([]*message.Message, 0, len(messages))
for _, msg := range messages {
if msg.Role == userRole {
rID, err = w.IDGenerator.GenID(ctx)
if err != nil {
return nil, err
}
} else if msg.Role == assistantRole && rID == 0 {
continue
} else {
return nil, fmt.Errorf("invalid role type %v", msg.Role)
}
messageType := ternary.IFElse(msg.Role == userRole, message.MessageTypeQuestion, message.MessageTypeAnswer)
m, err := toConversationMessage(ctx, appID, conversationID, userID, rID, messageType, msg)
if err != nil {
return nil, err
}
historyMessages = append(historyMessages, m)
}
return historyMessages, nil
}
func toConversationMessage(_ context.Context, appID int64, cid int64, userID int64, roundID int64, messageType message.MessageType, msg *workflow.EnterMessage) (*message.Message, error) {
type content struct {
Type string `json:"type"`
FileID *string `json:"file_id"`
Text *string `json:"text"`
}
if msg.ContentType == "text" {
return &message.Message{
Role: schema.User,
ConversationID: cid,
AgentID: appID,
RunID: roundID,
Content: msg.Content,
ContentType: message.ContentTypeText,
MessageType: messageType,
UserID: strconv.FormatInt(userID, 10),
}, nil
} else if msg.ContentType == "object_string" {
contents := make([]*content, 0)
err := sonic.UnmarshalString(msg.Content, &contents)
if err != nil {
return nil, err
}
m := &message.Message{
Role: schema.User,
MessageType: messageType,
ConversationID: cid,
UserID: strconv.FormatInt(userID, 10),
RunID: roundID,
Content: msg.Content,
ContentType: message.ContentTypeMix,
MultiContent: make([]*message.InputMetaData, 0, len(contents)),
}
for _, ct := range contents {
if ct.Text != nil {
m.MultiContent = append(m.MultiContent, &message.InputMetaData{
Type: message.InputTypeText,
Text: *ct.Text,
})
} else if ct.FileID != nil {
m.MultiContent = append(m.MultiContent, &message.InputMetaData{
Type: message.InputType(ct.Type),
FileData: []*message.FileData{
{Url: *ct.FileID},
},
})
} else {
return nil, fmt.Errorf("invalid input type %v", ct.Type)
}
}
return m, nil
} else {
return nil, fmt.Errorf("invalid message content type %v", msg.ContentType)
}
}

View File

@ -259,9 +259,10 @@ func (w *ApplicationService) UpdateWorkflowMeta(ctx context.Context, req *workfl
}
err = GetWorkflowDomainSVC().UpdateMeta(ctx, mustParseInt64(req.GetWorkflowID()), &vo.MetaUpdate{
Name: req.Name,
Desc: req.Desc,
IconURI: req.IconURI,
Name: req.Name,
Desc: req.Desc,
IconURI: req.IconURI,
WorkflowMode: req.FlowMode,
})
if err != nil {
return nil, err
@ -3319,6 +3320,37 @@ func (w *ApplicationService) CopyWkTemplateApi(ctx context.Context, req *workflo
return resp, err
}
func (w *ApplicationService) GetOrCreateConversation(ctx context.Context, req *workflow.GetOrCreateConversationRequest) (*workflow.GetOrCreateConversationResponse, error) {
var (
appID = mustParseInt64(req.GetAppID())
userID = ctxutil.MustGetUIDFromCtx(ctx)
env = ternary.IFElse(req.GetDraftMode(), vo.Draft, vo.Online)
//spaceID = mustParseInt64(req.GetSpaceID())
//_ = spaceID
)
// check permission
var (
err error
cID int64
)
if !req.GetGetOrCreate() {
cID, err = GetWorkflowDomainSVC().UpdateConversation(ctx, env, appID, req.GetConnectorId(), userID, req.GetConversationMame())
} else {
cID, err = GetWorkflowDomainSVC().GetOrCreateConversation(ctx, env, appID, req.GetConnectorId(), userID, req.GetConversationMame())
}
if err != nil {
return nil, err
}
return &workflow.GetOrCreateConversationResponse{
ConversationData: &workflow.ConversationData{
Id: cID,
},
}, nil
}
func mustParseInt64(s string) int64 {
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
@ -4034,3 +4066,62 @@ func (w *ApplicationService) convertChatFlowRole(ctx context.Context, role *enti
return res, nil
}
func (w *ApplicationService) OpenAPIGetWorkflowInfo(ctx context.Context, req *workflow.OpenAPIGetWorkflowInfoRequest) (
_ *workflow.OpenAPIGetWorkflowInfoResponse, err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = safego.NewPanicErr(panicErr, debug.Stack())
}
if err != nil {
err = vo.WrapIfNeeded(errno.ErrChatFlowRoleOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))
}
}()
uID := ctxutil.MustGetUIDFromCtx(ctx)
wf, err := GetWorkflowDomainSVC().Get(ctx, &vo.GetPolicy{
ID: mustParseInt64(req.GetWorkflowID()),
MetaOnly: true,
})
if err != nil {
return nil, err
}
if err = checkUserSpace(ctx, uID, wf.Meta.SpaceID); err != nil {
return nil, err
}
if !IsChatFlow(wf) {
logs.CtxWarnf(ctx, "GetChatFlowRole not chat flow, workflowID: %d", wf.ID)
return nil, vo.WrapError(errno.ErrChatFlowRoleOperationFail, fmt.Errorf("workflow %d is not a chat flow", wf.ID))
}
var version string
if wf.Meta.AppID != nil {
version = "" // TODO : search version from DB using AppID
}
role, err := GetWorkflowDomainSVC().GetChatFlowRole(ctx, mustParseInt64(req.WorkflowID), version)
if err != nil {
return nil, err
}
if role == nil {
logs.CtxWarnf(ctx, "GetChatFlowRole role nil, workflowID: %d", wf.ID)
// Return nil for the error to align with the production behavior,
// where the GET API may be called before the CREATE API during chatflow creation.
return &workflow.OpenAPIGetWorkflowInfoResponse{}, nil
}
wfRole, err := w.convertChatFlowRole(ctx, role)
if err != nil {
return nil, fmt.Errorf("failed to get chat flow role config, internal data processing error: %+v", err)
}
return &workflow.OpenAPIGetWorkflowInfoResponse{
WorkflowInfo: &workflow.WorkflowInfo{
Role: wfRole,
},
}, nil
}

View File

@ -181,7 +181,7 @@ func convertMessage(msgs []*msgentity.Message) ([]*conversation.Message, error)
for _, fd := range c.FileData {
mcs = append(mcs, &conversation.Content{
Type: string(c.Type),
Uri: ptr.Of(fd.Url),
Uri: ptr.Of(fd.URI),
})
}
} else {

View File

@ -65,6 +65,8 @@ type ConversationService interface {
ListDynamicConversation(ctx context.Context, env vo.Env, policy *vo.ListConversationPolicy) ([]*entity.DynamicConversation, error)
ReleaseConversationTemplate(ctx context.Context, appID int64, version string) error
InitApplicationDefaultConversationTemplate(ctx context.Context, spaceID int64, appID int64, userID int64) error
GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
UpdateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
}
type InterruptEventStore interface {
@ -122,4 +124,6 @@ type ConversationRepository interface {
ListDynamicConversation(ctx context.Context, env vo.Env, policy *vo.ListConversationPolicy) ([]*entity.DynamicConversation, error)
BatchCreateOnlineConversationTemplate(ctx context.Context, templates []*entity.ConversationTemplate, version string) error
UpdateDynamicConversationNameByID(ctx context.Context, env vo.Env, templateID int64, name string) error
UpdateStaticConversation(ctx context.Context, env vo.Env, templateID int64, connectorID int64, userID int64, newConversationID int64) error
UpdateDynamicConversation(ctx context.Context, env vo.Env, conversationID, newConversationID int64) error
}

View File

@ -27,6 +27,15 @@ type Canvas struct {
Versions any `json:"versions"`
}
func (c *Canvas) StartNode() *Node {
for _, node := range c.Nodes {
if node.Type == BlockTypeBotStart {
return node
}
}
panic("canvas start node not found")
}
type Node struct {
ID string `json:"id"`
Type BlockType `json:"type"`

View File

@ -0,0 +1,68 @@
package vo
type ChatFlowEvent string
const (
ChatFlowCreated ChatFlowEvent = "conversation.chat.created"
ChatFlowInProgress ChatFlowEvent = "conversation.chat.in_progress"
ChatFlowCompleted ChatFlowEvent = "conversation.chat.completed"
ChatFlowFailed ChatFlowEvent = "conversation.chat.failed"
ChatFlowRequiresAction ChatFlowEvent = "conversation.chat.requires_action"
ChatFlowError ChatFlowEvent = "error"
ChatFlowDone ChatFlowEvent = "done"
ChatFlowMessageDelta ChatFlowEvent = "conversation.message.delta"
ChatFlowMessageCompleted ChatFlowEvent = "conversation.message.completed"
)
type LastError struct {
Code int32 `form:"code,required" json:"code,required"`
Msg string `form:"msg,required" json:"msg,required"`
}
type RequiredAction struct {
Type string `form:"type" json:"type"`
//SubmitToolOutputs *SubmitToolOutputs `form:"submit_tool_outputs" json:"submit_tool_outputs"`
}
type Usage struct {
TokenCount *int32 `form:"token_count" json:"token_count,omitempty"`
OutputTokens *int32 `form:"output_count" json:"output_count,omitempty"`
InputTokens *int32 `form:"input_count" json:"input_count,omitempty"`
}
type Status string
const (
Created Status = "created"
InProgress Status = "in_progress"
Completed Status = "completed"
Failed Status = "failed"
RequiresAction Status = "requires_action"
Canceled Status = "canceled"
)
type ChatFlowDetail struct {
ID string `json:"id,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
BotID string `json:"bot_id,omitempty"`
Status Status `json:"status,omitempty"`
LastError *LastError `json:"last_error,omitempty"`
Usage *Usage `json:"usage,omitempty"`
RequiredAction *RequiredAction `json:"required_action,omitempty"`
ExecuteID string `json:"execute_id,omitempty"`
}
type MessageDetail struct {
ID string `json:"id"`
ChatID string `json:"chat_id"`
ConversationID string `json:"conversation_id"`
BotID string `json:"bot_id"`
Role string `json:"role"`
Type string `json:"type"`
Content string `json:"content"`
ContentType string `json:"content_type"`
}
type ErrorDetail struct {
Code string `form:"code,required" json:"code,required"`
Msg string `form:"msg,required" json:"msg,required"`
DebugUrl string `form:"debug_url" json:"debug_url,omitempty"`
}

View File

@ -37,6 +37,7 @@ type ExecuteConfig struct {
WorkflowMode WorkflowMode
RoundID *int64 // if workflow is chat flow, conversation round id is required
ConversationID *int64 // if workflow is chat flow, conversation id is required
EnterMessage *workflow.EnterMessage
}
type ExecuteMode string

View File

@ -68,6 +68,7 @@ type MetaUpdate struct {
IconURI *string
HasPublished *bool
LatestPublishedVersion *string
WorkflowMode *Mode
}
type MetaQuery struct {

View File

@ -86,7 +86,9 @@ func init() {
_ = compose.RegisterSerializableType[vo.Locator]("wf_locator")
_ = compose.RegisterSerializableType[vo.BizType]("biz_type")
_ = compose.RegisterSerializableType[*variableassigner.AppVariables]("app_variables")
_ = compose.RegisterSerializableType[workflow2.WorkflowMode]("workflow_mode")
_ = compose.RegisterSerializableType[*workflow2.EnterMessage]("workflow_entity_message")
}

View File

@ -724,3 +724,60 @@ func (r *RepositoryImpl) UpdateDynamicConversationNameByID(ctx context.Context,
}
}
func (r *RepositoryImpl) UpdateStaticConversation(ctx context.Context, env vo.Env, templateID int64, connectorID int64, userID int64, newConversationID int64) error {
if env == vo.Draft {
appStaticConversationDraft := r.query.AppStaticConversationDraft
_, err := appStaticConversationDraft.WithContext(ctx).Where(
appStaticConversationDraft.TemplateID.Eq(templateID),
appStaticConversationDraft.ConnectorID.Eq(connectorID),
appStaticConversationDraft.UserID.Eq(userID),
).UpdateColumn(appStaticConversationDraft.ConversationID, newConversationID)
if err != nil {
return err
}
return err
} else if env == vo.Online {
appStaticConversationOnline := r.query.AppStaticConversationOnline
_, err := appStaticConversationOnline.WithContext(ctx).Where(
appStaticConversationOnline.TemplateID.Eq(templateID),
appStaticConversationOnline.ConnectorID.Eq(connectorID),
appStaticConversationOnline.UserID.Eq(userID),
).UpdateColumn(appStaticConversationOnline.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("unknown env %v", env)
}
}
func (r *RepositoryImpl) UpdateDynamicConversation(ctx context.Context, env vo.Env, conversationID, newConversationID int64) error {
if env == vo.Draft {
appDynamicConversationDraft := r.query.AppDynamicConversationDraft
_, err := appDynamicConversationDraft.WithContext(ctx).Where(appDynamicConversationDraft.ConversationID.Eq(conversationID)).
UpdateColumn(appDynamicConversationDraft.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else if env == vo.Online {
appDynamicConversationOnline := r.query.AppDynamicConversationOnline
_, err := appDynamicConversationOnline.WithContext(ctx).Where(appDynamicConversationOnline.ConversationID.Eq(conversationID)).
UpdateColumn(appDynamicConversationOnline.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("unknown env %v", env)
}
}

View File

@ -497,6 +497,10 @@ func (r *RepositoryImpl) UpdateMeta(ctx context.Context, id int64, metaUpdate *v
expressions = append(expressions, r.query.WorkflowMeta.LatestVersion.Value(*metaUpdate.LatestPublishedVersion))
}
if metaUpdate.WorkflowMode != nil {
expressions = append(expressions, r.query.WorkflowMeta.Mode.Value(int32(*metaUpdate.WorkflowMode)))
}
if len(expressions) == 0 {
return nil
}

View File

@ -22,6 +22,7 @@ import (
cloudworkflow "github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
@ -335,3 +336,91 @@ func (c *conversationImpl) replaceWorkflowsConversationName(ctx context.Context,
func (c *conversationImpl) DeleteDynamicConversation(ctx context.Context, env vo.Env, templateID int64) (int64, error) {
return c.repo.DeleteDynamicConversation(ctx, env, templateID)
}
func (c *conversationImpl) GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error) {
t, existed, err := c.repo.GetConversationTemplate(ctx, env, vo.GetConversationTemplatePolicy{
AppID: ptr.Of(appID),
Name: ptr.Of(conversationName),
})
if err != nil {
return 0, err
}
conversationIDGenerator := workflow.ConversationIDGenerator(func(ctx context.Context, appID int64, userID, connectorID int64) (int64, error) {
return conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
})
if existed {
conversationID, _, err := c.repo.GetOrCreateStaticConversation(ctx, env, conversationIDGenerator, &vo.CreateStaticConversation{
AppID: appID,
ConnectorID: connectorID,
UserID: userID,
TemplateID: t.TemplateID,
})
if err != nil {
return 0, err
}
return conversationID, nil
}
conversationID, _, err := c.repo.GetOrCreateDynamicConversation(ctx, env, conversationIDGenerator, &vo.CreateDynamicConversation{
AppID: appID,
ConnectorID: connectorID,
UserID: userID,
Name: conversationName,
})
if err != nil {
return 0, err
}
return conversationID, nil
}
func (c *conversationImpl) UpdateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error) {
t, existed, err := c.repo.GetConversationTemplate(ctx, env, vo.GetConversationTemplatePolicy{
AppID: ptr.Of(appID),
Name: ptr.Of(conversationName),
})
if err != nil {
return 0, err
}
if existed {
newConversationID, err := conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
err = c.repo.UpdateStaticConversation(ctx, env, t.TemplateID, connectorID, userID, newConversationID)
if err != nil {
return 0, err
}
return newConversationID, nil
}
dy, existed, err := c.repo.GetDynamicConversationByName(ctx, env, appID, connectorID, userID, conversationName)
if err != nil {
return 0, err
}
if !existed {
return 0, fmt.Errorf("conversation name %v not found", conversationName)
}
newConversationID, err := conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
err = c.repo.UpdateDynamicConversation(ctx, env, dy.ConversationID, newConversationID)
if err != nil {
return 0, err
}
return newConversationID, nil
}

View File

@ -380,6 +380,8 @@ func (i *impl) StreamExecute(ctx context.Context, config vo.ExecuteConfig, input
return nil, err
}
config.WorkflowMode = wfEntity.Mode
isApplicationWorkflow := wfEntity.AppID != nil
if isApplicationWorkflow && config.Mode == vo.ExecuteModeRelease {
err = i.checkApplicationWorkflowReleaseVersion(ctx, *wfEntity.AppID, config.ConnectorID, config.ID, config.Version)

View File

@ -766,6 +766,13 @@ func (i *impl) UpdateMeta(ctx context.Context, id int64, metaUpdate *vo.MetaUpda
return err
}
if metaUpdate.WorkflowMode != nil && *metaUpdate.WorkflowMode == cloudworkflow.WorkflowMode_ChatFlow {
err = i.adaptToChatFlow(ctx, id)
if err != nil {
return err
}
}
err = search.GetNotifier().PublishWorkflowResource(ctx, search.Updated, &search.Resource{
WorkflowID: id,
URI: metaUpdate.IconURI,
@ -1920,3 +1927,48 @@ func replaceRelatedWorkflowOrExternalResourceInWorkflowNodes(nodes []*vo.Node, r
}
return nil
}
func (i *impl) adaptToChatFlow(ctx context.Context, wID int64) error {
wfEntity, err := i.repo.GetEntity(ctx, &vo.GetPolicy{
ID: wID,
QType: vo.FromDraft,
})
if err != nil {
return err
}
canvas := &vo.Canvas{}
err = sonic.UnmarshalString(wfEntity.Canvas, canvas)
if err != nil {
return err
}
startNode := canvas.StartNode()
vMap := make(map[string]bool)
for _, o := range startNode.Data.Outputs {
v, err := vo.ParseVariable(o)
if err != nil {
return err
}
vMap[v.Name] = true
}
if _, ok := vMap["USER_INPUT"]; !ok {
startNode.Data.Outputs = append(startNode.Data.Outputs, &vo.Variable{
Name: "USER_INPUT",
Type: vo.VariableTypeString,
})
}
if _, ok := vMap["CONVERSATION_NAME"]; !ok {
startNode.Data.Outputs = append(startNode.Data.Outputs, &vo.Variable{
Name: "CONVERSATION_NAME",
Type: vo.VariableTypeString,
DefaultValue: "Default",
})
}
canvasStr, err := sonic.MarshalString(canvas)
if err != nil {
return err
}
return i.Save(ctx, wID, canvasStr)
}

View File

@ -65,6 +65,9 @@ service WorkflowService {
workflow.GetChatFlowRoleResponse GetChatFlowRole(1: workflow.GetChatFlowRoleRequest request) (api.get='/api/workflow_api/chat_flow_role/get', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
workflow.CreateChatFlowRoleResponse CreateChatFlowRole(1: workflow.CreateChatFlowRoleRequest request) (api.post='/api/workflow_api/chat_flow_role/create', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
workflow.DeleteChatFlowRoleResponse DeleteChatFlowRole(1: workflow.DeleteChatFlowRoleRequest request) (api.post='/api/workflow_api/chat_flow_role/delete', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
workflow.GetOrCreateConversationResponse GetOrCreateConversation(1: workflow.GetOrCreateConversationRequest request)(api.post = "/api/workflow_api/conversation/create", api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
// App 发布管理
workflow.ListPublishWorkflowResponse ListPublishWorkflow(1: workflow.ListPublishWorkflowRequest request) (api.post='/api/workflow_api/list_publish_workflow', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")

View File

@ -2200,3 +2200,34 @@ struct OpenAPIGetWorkflowInfoResponse{
255: required base.BaseResp BaseResp
}
struct GetOrCreateConversationRequest {
1: optional map<string,string> MetaData (api.body = "meta_data") //自定义透传字段
3: optional i64 BotId (api.body = "bot_id", api.js_conv="true")
4: optional i64 ConnectorId (api.body= "connector_id", api.js_conv="true")
5: optional string SpaceID (api.body= "space_id", api.js_conv="true")
9 : optional string AppID (go.tag="json:\"app_id\"")
10: optional string WorkflowID (go.tag="json:\"workflow_id\"")
11: optional string ConversationMame (go.tag="json:\"conversation_name\"")
12: optional bool GetOrCreate (go.tag="json:\"get_or_create\"")
13: optional bool DraftMode (go.tag="json:\"draft_mode\"")
255: optional base.Base Base
}
struct GetOrCreateConversationResponse {
1: i64 code
2: string msg
3: optional ConversationData ConversationData (api.body = "data")
}
struct ConversationData {
1: i64 Id (api.body = "id", agw.key = "id", api.js_conv="true")
2: i64 CreatedAt (api.body = "created_at", agw.key = "created_at")
3: map<string,string> MetaData (api.body = "meta_data", agw.key = "meta_data")
4: optional i64 CreatorID (api.body = "creator_d", agw.key = "creator_d", api.js_conv="true")
5: optional i64 ConnectorID (api.body = "connector_id", agw.key="connector_id", api.js_conv="true")
6: optional i64 LastSectionID (api.body="last_section_id", api.js_conv="true")
7: optional i64 AccountID (api.body = "account_id")
}

View File

@ -81,4 +81,4 @@ if [ -f "$WORKFLOW_SANBOX" ]; then
else
echo "$WORKFLOW_SANBOX file not found"
exit 1
fi
fi