feat(backend): support open api for chat flow run

This commit is contained in:
zhuangjie.1125
2025-07-31 15:40:27 +08:00
committed by lvxinyu.1117
parent 9fdb3ecc79
commit f9bbdffa02
27 changed files with 3494 additions and 12 deletions

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/hertz/pkg/app"
@ -1094,9 +1095,62 @@ func OpenAPIChatFlowRun(ctx context.Context, c *app.RequestContext) {
return
}
resp := new(workflow.ChatFlowRunResponse)
w := sse.NewWriter(c)
c.SetContentType("text/event-stream; charset=utf-8")
c.Response.Header.Set("Cache-Control", "no-cache")
c.Response.Header.Set("Connection", "keep-alive")
c.Response.Header.Set("Access-Control-Allow-Origin", "*")
c.JSON(consts.StatusOK, resp)
sr, err := appworkflow.SVC.OpenAPIChatFlowRun(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
sendChatFlowStreamRunSSE(ctx, w, sr)
}
func sendChatFlowStreamRunSSE(ctx context.Context, w *sse.Writer, sr *schema.StreamReader[[]*workflow.ChatFlowRunResponse]) {
defer func() {
_ = w.Close()
sr.Close()
}()
seq := int64(1)
for {
respList, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
event := &sse.Event{
Type: "error",
Data: []byte(err.Error()),
}
if err = w.Write(event); err != nil {
logs.CtxErrorf(ctx, "publish stream event failed, err:%v", err)
}
return
}
for _, resp := range respList {
event := &sse.Event{
ID: strconv.FormatInt(seq, 10),
Type: resp.Event,
Data: []byte(resp.Data),
}
if err = w.Write(event); err != nil {
logs.CtxErrorf(ctx, "publish stream event failed, err:%v", err)
return
}
seq++
}
}
}
// OpenAPIGetWorkflowInfo .
@ -1110,7 +1164,11 @@ func OpenAPIGetWorkflowInfo(ctx context.Context, c *app.RequestContext) {
return
}
resp := new(workflow.OpenAPIGetWorkflowInfoResponse)
resp, err := appworkflow.SVC.OpenAPIGetWorkflowInfo(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
c.JSON(consts.StatusOK, resp)
}
@ -1154,3 +1212,22 @@ func GetExampleWorkFlowList(ctx context.Context, c *app.RequestContext) {
c.JSON(consts.StatusOK, resp)
}
// OpenAPICreateConversation .
// @router /v1/workflow/conversation/create [POST]
func OpenAPICreateConversation(ctx context.Context, c *app.RequestContext) {
var err error
var req workflow.CreateConversationRequest
err = c.BindAndValidate(&req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
resp, err := appworkflow.SVC.OpenAPICreateConversation(ctx, &req)
if err != nil {
internalServerErrorResponse(ctx, c, err)
return
}
c.JSON(consts.StatusOK, resp)
}

View File

@ -1,3 +1,19 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Code generated by thriftgo (0.4.1). DO NOT EDIT.
package common
@ -52,6 +68,8 @@ func (p Scene) String() string {
return "GenerateAgentInfo"
case Scene_SceneOpenApi:
return "SceneOpenApi"
case Scene_SceneWorkflow:
return "SceneWorkflow"
}
return "<UNSET>"
}
@ -78,6 +96,8 @@ func SceneFromString(s string) (Scene, error) {
return Scene_GenerateAgentInfo, nil
case "SceneOpenApi":
return Scene_SceneOpenApi, nil
case "SceneWorkflow":
return Scene_SceneWorkflow, nil
}
return Scene(0), fmt.Errorf("not a valid Scene string")
}

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,19 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Code generated by thriftgo (0.4.1). DO NOT EDIT.
package workflow
@ -106,6 +122,8 @@ type WorkflowService interface {
OpenAPIChatFlowRun(ctx context.Context, request *ChatFlowRunRequest) (r *ChatFlowRunResponse, err error)
OpenAPIGetWorkflowInfo(ctx context.Context, request *OpenAPIGetWorkflowInfoRequest) (r *OpenAPIGetWorkflowInfoResponse, err error)
OpenAPICreateConversation(ctx context.Context, request *CreateConversationRequest) (r *CreateConversationRequest, err error)
}
type WorkflowServiceClient struct {
@ -566,6 +584,15 @@ func (p *WorkflowServiceClient) OpenAPIGetWorkflowInfo(ctx context.Context, requ
}
return _result.GetSuccess(), nil
}
func (p *WorkflowServiceClient) OpenAPICreateConversation(ctx context.Context, request *CreateConversationRequest) (r *CreateConversationRequest, err error) {
var _args WorkflowServiceOpenAPICreateConversationArgs
_args.Request = request
var _result WorkflowServiceOpenAPICreateConversationResult
if err = p.Client_().Call(ctx, "OpenAPICreateConversation", &_args, &_result); err != nil {
return
}
return _result.GetSuccess(), nil
}
type WorkflowServiceProcessor struct {
processorMap map[string]thrift.TProcessorFunction
@ -635,6 +662,7 @@ func NewWorkflowServiceProcessor(handler WorkflowService) *WorkflowServiceProces
self.AddToProcessorMap("OpenAPIGetWorkflowRunHistory", &workflowServiceProcessorOpenAPIGetWorkflowRunHistory{handler: handler})
self.AddToProcessorMap("OpenAPIChatFlowRun", &workflowServiceProcessorOpenAPIChatFlowRun{handler: handler})
self.AddToProcessorMap("OpenAPIGetWorkflowInfo", &workflowServiceProcessorOpenAPIGetWorkflowInfo{handler: handler})
self.AddToProcessorMap("OpenAPICreateConversation", &workflowServiceProcessorOpenAPICreateConversation{handler: handler})
return self
}
func (p *WorkflowServiceProcessor) Process(ctx context.Context, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
@ -2959,6 +2987,54 @@ func (p *workflowServiceProcessorOpenAPIGetWorkflowInfo) Process(ctx context.Con
return true, err
}
type workflowServiceProcessorOpenAPICreateConversation struct {
handler WorkflowService
}
func (p *workflowServiceProcessorOpenAPICreateConversation) Process(ctx context.Context, seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := WorkflowServiceOpenAPICreateConversationArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("OpenAPICreateConversation", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return false, err
}
iprot.ReadMessageEnd()
var err2 error
result := WorkflowServiceOpenAPICreateConversationResult{}
var retval *CreateConversationRequest
if retval, err2 = p.handler.OpenAPICreateConversation(ctx, args.Request); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing OpenAPICreateConversation: "+err2.Error())
oprot.WriteMessageBegin("OpenAPICreateConversation", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush(ctx)
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("OpenAPICreateConversation", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(ctx); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
type WorkflowServiceCreateWorkflowArgs struct {
Request *CreateWorkflowRequest `thrift:"request,1"`
}
@ -16974,3 +17050,295 @@ func (p *WorkflowServiceOpenAPIGetWorkflowInfoResult) String() string {
return fmt.Sprintf("WorkflowServiceOpenAPIGetWorkflowInfoResult(%+v)", *p)
}
type WorkflowServiceOpenAPICreateConversationArgs struct {
Request *CreateConversationRequest `thrift:"request,1"`
}
func NewWorkflowServiceOpenAPICreateConversationArgs() *WorkflowServiceOpenAPICreateConversationArgs {
return &WorkflowServiceOpenAPICreateConversationArgs{}
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) InitDefault() {
}
var WorkflowServiceOpenAPICreateConversationArgs_Request_DEFAULT *CreateConversationRequest
func (p *WorkflowServiceOpenAPICreateConversationArgs) GetRequest() (v *CreateConversationRequest) {
if !p.IsSetRequest() {
return WorkflowServiceOpenAPICreateConversationArgs_Request_DEFAULT
}
return p.Request
}
var fieldIDToName_WorkflowServiceOpenAPICreateConversationArgs = map[int16]string{
1: "request",
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) IsSetRequest() bool {
return p.Request != nil
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) Read(iprot thrift.TProtocol) (err error) {
var fieldTypeId thrift.TType
var fieldId int16
if _, err = iprot.ReadStructBegin(); err != nil {
goto ReadStructBeginError
}
for {
_, fieldTypeId, fieldId, err = iprot.ReadFieldBegin()
if err != nil {
goto ReadFieldBeginError
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 1:
if fieldTypeId == thrift.STRUCT {
if err = p.ReadField1(iprot); err != nil {
goto ReadFieldError
}
} else if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
default:
if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
}
if err = iprot.ReadFieldEnd(); err != nil {
goto ReadFieldEndError
}
}
if err = iprot.ReadStructEnd(); err != nil {
goto ReadStructEndError
}
return nil
ReadStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T read struct begin error: ", p), err)
ReadFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err)
ReadFieldError:
return thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_WorkflowServiceOpenAPICreateConversationArgs[fieldId]), err)
SkipFieldError:
return thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err)
ReadFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T read field end error", p), err)
ReadStructEndError:
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) ReadField1(iprot thrift.TProtocol) error {
_field := NewCreateConversationRequest()
if err := _field.Read(iprot); err != nil {
return err
}
p.Request = _field
return nil
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) Write(oprot thrift.TProtocol) (err error) {
var fieldId int16
if err = oprot.WriteStructBegin("OpenAPICreateConversation_args"); err != nil {
goto WriteStructBeginError
}
if p != nil {
if err = p.writeField1(oprot); err != nil {
fieldId = 1
goto WriteFieldError
}
}
if err = oprot.WriteFieldStop(); err != nil {
goto WriteFieldStopError
}
if err = oprot.WriteStructEnd(); err != nil {
goto WriteStructEndError
}
return nil
WriteStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
WriteFieldError:
return thrift.PrependError(fmt.Sprintf("%T write field %d error: ", p, fieldId), err)
WriteFieldStopError:
return thrift.PrependError(fmt.Sprintf("%T write field stop error: ", p), err)
WriteStructEndError:
return thrift.PrependError(fmt.Sprintf("%T write struct end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) writeField1(oprot thrift.TProtocol) (err error) {
if err = oprot.WriteFieldBegin("request", thrift.STRUCT, 1); err != nil {
goto WriteFieldBeginError
}
if err := p.Request.Write(oprot); err != nil {
return err
}
if err = oprot.WriteFieldEnd(); err != nil {
goto WriteFieldEndError
}
return nil
WriteFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T write field 1 begin error: ", p), err)
WriteFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T write field 1 end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationArgs) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("WorkflowServiceOpenAPICreateConversationArgs(%+v)", *p)
}
type WorkflowServiceOpenAPICreateConversationResult struct {
Success *CreateConversationRequest `thrift:"success,0,optional"`
}
func NewWorkflowServiceOpenAPICreateConversationResult() *WorkflowServiceOpenAPICreateConversationResult {
return &WorkflowServiceOpenAPICreateConversationResult{}
}
func (p *WorkflowServiceOpenAPICreateConversationResult) InitDefault() {
}
var WorkflowServiceOpenAPICreateConversationResult_Success_DEFAULT *CreateConversationRequest
func (p *WorkflowServiceOpenAPICreateConversationResult) GetSuccess() (v *CreateConversationRequest) {
if !p.IsSetSuccess() {
return WorkflowServiceOpenAPICreateConversationResult_Success_DEFAULT
}
return p.Success
}
var fieldIDToName_WorkflowServiceOpenAPICreateConversationResult = map[int16]string{
0: "success",
}
func (p *WorkflowServiceOpenAPICreateConversationResult) IsSetSuccess() bool {
return p.Success != nil
}
func (p *WorkflowServiceOpenAPICreateConversationResult) Read(iprot thrift.TProtocol) (err error) {
var fieldTypeId thrift.TType
var fieldId int16
if _, err = iprot.ReadStructBegin(); err != nil {
goto ReadStructBeginError
}
for {
_, fieldTypeId, fieldId, err = iprot.ReadFieldBegin()
if err != nil {
goto ReadFieldBeginError
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 0:
if fieldTypeId == thrift.STRUCT {
if err = p.ReadField0(iprot); err != nil {
goto ReadFieldError
}
} else if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
default:
if err = iprot.Skip(fieldTypeId); err != nil {
goto SkipFieldError
}
}
if err = iprot.ReadFieldEnd(); err != nil {
goto ReadFieldEndError
}
}
if err = iprot.ReadStructEnd(); err != nil {
goto ReadStructEndError
}
return nil
ReadStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T read struct begin error: ", p), err)
ReadFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T read field %d begin error: ", p, fieldId), err)
ReadFieldError:
return thrift.PrependError(fmt.Sprintf("%T read field %d '%s' error: ", p, fieldId, fieldIDToName_WorkflowServiceOpenAPICreateConversationResult[fieldId]), err)
SkipFieldError:
return thrift.PrependError(fmt.Sprintf("%T field %d skip type %d error: ", p, fieldId, fieldTypeId), err)
ReadFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T read field end error", p), err)
ReadStructEndError:
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationResult) ReadField0(iprot thrift.TProtocol) error {
_field := NewCreateConversationRequest()
if err := _field.Read(iprot); err != nil {
return err
}
p.Success = _field
return nil
}
func (p *WorkflowServiceOpenAPICreateConversationResult) Write(oprot thrift.TProtocol) (err error) {
var fieldId int16
if err = oprot.WriteStructBegin("OpenAPICreateConversation_result"); err != nil {
goto WriteStructBeginError
}
if p != nil {
if err = p.writeField0(oprot); err != nil {
fieldId = 0
goto WriteFieldError
}
}
if err = oprot.WriteFieldStop(); err != nil {
goto WriteFieldStopError
}
if err = oprot.WriteStructEnd(); err != nil {
goto WriteStructEndError
}
return nil
WriteStructBeginError:
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
WriteFieldError:
return thrift.PrependError(fmt.Sprintf("%T write field %d error: ", p, fieldId), err)
WriteFieldStopError:
return thrift.PrependError(fmt.Sprintf("%T write field stop error: ", p), err)
WriteStructEndError:
return thrift.PrependError(fmt.Sprintf("%T write struct end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationResult) writeField0(oprot thrift.TProtocol) (err error) {
if p.IsSetSuccess() {
if err = oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil {
goto WriteFieldBeginError
}
if err := p.Success.Write(oprot); err != nil {
return err
}
if err = oprot.WriteFieldEnd(); err != nil {
goto WriteFieldEndError
}
}
return nil
WriteFieldBeginError:
return thrift.PrependError(fmt.Sprintf("%T write field 0 begin error: ", p), err)
WriteFieldEndError:
return thrift.PrependError(fmt.Sprintf("%T write field 0 end error: ", p), err)
}
func (p *WorkflowServiceOpenAPICreateConversationResult) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("WorkflowServiceOpenAPICreateConversationResult(%+v)", *p)
}

View File

@ -1,3 +1,19 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Code generated by hertz generator. DO NOT EDIT.
package coze
@ -438,6 +454,10 @@ func Register(r *server.Hertz) {
_workflow.POST("/run", append(_openapirunflowMw(), coze.OpenAPIRunFlow)...)
_workflow.POST("/stream_resume", append(_openapistreamresumeflowMw(), coze.OpenAPIStreamResumeFlow)...)
_workflow.POST("/stream_run", append(_openapistreamrunflowMw(), coze.OpenAPIStreamRunFlow)...)
{
_conversation1 := _workflow.Group("/conversation", _conversation1Mw()...)
_conversation1.POST("/create", append(_openapicreateconversationMw(), coze.OpenAPICreateConversation)...)
}
}
{
_workflows := _v1.Group("/workflows", _workflowsMw()...)

View File

@ -1505,3 +1505,18 @@ func _upload1Mw() []app.HandlerFunc {
// your code...
return nil
}
func _getorcreateconversationMw() []app.HandlerFunc {
// your code...
return nil
}
func _conversation1Mw() []app.HandlerFunc {
// your code...
return nil
}
func _openapicreateconversationMw() []app.HandlerFunc {
// your code...
return nil
}

View File

@ -20,6 +20,7 @@ package router
import (
"context"
"net/http"
"os"
"path"
"strings"
@ -74,4 +75,17 @@ func staticFileRegister(r *server.Hertz) {
ctx.File(staticFile)
})
r.Handle(http.MethodPost, "api/permission_api/coze_web_app/impersonate_coze_user", func(c context.Context, ctx *app.RequestContext) {
ctx.JSON(200, map[string]any{
"code": 0,
"msg": "ok",
"data": map[string]interface{}{
"access_token": "pat_ab1212d883ba6e0a63c28dd47a6e13629f7d5885b78b55d933e9caf9aae737c2",
"expires_in": 1753998220,
"token_type": "Bearer",
},
})
})
}

File diff suppressed because it is too large Load Diff

View File

@ -255,9 +255,10 @@ func (w *ApplicationService) UpdateWorkflowMeta(ctx context.Context, req *workfl
}
err = GetWorkflowDomainSVC().UpdateMeta(ctx, mustParseInt64(req.GetWorkflowID()), &vo.MetaUpdate{
Name: req.Name,
Desc: req.Desc,
IconURI: req.IconURI,
Name: req.Name,
Desc: req.Desc,
IconURI: req.IconURI,
WorkflowMode: req.FlowMode,
})
if err != nil {
return nil, err
@ -4027,3 +4028,62 @@ func (w *ApplicationService) convertChatFlowRole(ctx context.Context, role *enti
return res, nil
}
func (w *ApplicationService) OpenAPIGetWorkflowInfo(ctx context.Context, req *workflow.OpenAPIGetWorkflowInfoRequest) (
_ *workflow.OpenAPIGetWorkflowInfoResponse, err error) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = safego.NewPanicErr(panicErr, debug.Stack())
}
if err != nil {
err = vo.WrapIfNeeded(errno.ErrChatFlowRoleOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))
}
}()
uID := ctxutil.MustGetUIDFromCtx(ctx)
wf, err := GetWorkflowDomainSVC().Get(ctx, &vo.GetPolicy{
ID: mustParseInt64(req.GetWorkflowID()),
MetaOnly: true,
})
if err != nil {
return nil, err
}
if err = checkUserSpace(ctx, uID, wf.Meta.SpaceID); err != nil {
return nil, err
}
if !IsChatFlow(wf) {
logs.CtxWarnf(ctx, "GetChatFlowRole not chat flow, workflowID: %d", wf.ID)
return nil, vo.WrapError(errno.ErrChatFlowRoleOperationFail, fmt.Errorf("workflow %d is not a chat flow", wf.ID))
}
var version string
if wf.Meta.AppID != nil {
version = "" // TODO : search version from DB using AppID
}
role, err := GetWorkflowDomainSVC().GetChatFlowRole(ctx, mustParseInt64(req.WorkflowID), version)
if err != nil {
return nil, err
}
if role == nil {
logs.CtxWarnf(ctx, "GetChatFlowRole role nil, workflowID: %d", wf.ID)
// Return nil for the error to align with the production behavior,
// where the GET API may be called before the CREATE API during chatflow creation.
return &workflow.OpenAPIGetWorkflowInfoResponse{}, nil
}
wfRole, err := w.convertChatFlowRole(ctx, role)
if err != nil {
return nil, fmt.Errorf("failed to get chat flow role config, internal data processing error: %+v", err)
}
return &workflow.OpenAPIGetWorkflowInfoResponse{
WorkflowInfo: &workflow.WorkflowInfo{
Role: wfRole,
},
}, nil
}

View File

@ -906,4 +906,4 @@
- tool_id: 220009
deprecated: false
method: get
sub_url: /v5/place/around
sub_url: /v5/place/around

View File

@ -181,7 +181,7 @@ func convertMessage(msgs []*msgentity.Message) ([]*conversation.Message, error)
for _, fd := range c.FileData {
mcs = append(mcs, &conversation.Content{
Type: string(c.Type),
Uri: ptr.Of(fd.Url),
Uri: ptr.Of(fd.URI),
})
}
} else {

View File

@ -32,7 +32,9 @@ type Executable interface {
AsyncExecute(ctx context.Context, config vo.ExecuteConfig, input map[string]any) (int64, error)
AsyncExecuteNode(ctx context.Context, nodeID string, config vo.ExecuteConfig, input map[string]any) (int64, error)
AsyncResume(ctx context.Context, req *entity.ResumeRequest, config vo.ExecuteConfig) error
StreamExecute(ctx context.Context, config vo.ExecuteConfig, input map[string]any) (*schema.StreamReader[*entity.Message], error)
StreamResume(ctx context.Context, req *entity.ResumeRequest, config vo.ExecuteConfig) (
*schema.StreamReader[*entity.Message], error)
@ -65,6 +67,8 @@ type ConversationService interface {
ListDynamicConversation(ctx context.Context, env vo.Env, policy *vo.ListConversationPolicy) ([]*entity.DynamicConversation, error)
ReleaseConversationTemplate(ctx context.Context, appID int64, version string) error
InitApplicationDefaultConversationTemplate(ctx context.Context, spaceID int64, appID int64, userID int64) error
GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
UpdateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error)
}
type InterruptEventStore interface {
@ -73,6 +77,9 @@ type InterruptEventStore interface {
UpdateFirstInterruptEvent(ctx context.Context, wfExeID int64, event *entity.InterruptEvent) error
PopFirstInterruptEvent(ctx context.Context, wfExeID int64) (*entity.InterruptEvent, bool, error)
ListInterruptEvents(ctx context.Context, wfExeID int64) ([]*entity.InterruptEvent, error)
BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) error
GetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error)
}
type CancelSignalStore interface {
@ -122,4 +129,6 @@ type ConversationRepository interface {
ListDynamicConversation(ctx context.Context, env vo.Env, policy *vo.ListConversationPolicy) ([]*entity.DynamicConversation, error)
BatchCreateOnlineConversationTemplate(ctx context.Context, templates []*entity.ConversationTemplate, version string) error
UpdateDynamicConversationNameByID(ctx context.Context, env vo.Env, templateID int64, name string) error
UpdateStaticConversation(ctx context.Context, env vo.Env, templateID int64, connectorID int64, userID int64, newConversationID int64) error
UpdateDynamicConversation(ctx context.Context, env vo.Env, conversationID, newConversationID int64) error
}

View File

@ -72,3 +72,9 @@ type ToolInterruptEvent struct {
ExecuteID int64
*InterruptEvent
}
type ConvRelatedInfo struct {
EventID int64
ExecID int64
NodeType NodeType
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2025 coze-dev Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vo
type ChatFlowEvent string
const (
ChatFlowCreated ChatFlowEvent = "conversation.chat.created"
ChatFlowInProgress ChatFlowEvent = "conversation.chat.in_progress"
ChatFlowCompleted ChatFlowEvent = "conversation.chat.completed"
ChatFlowFailed ChatFlowEvent = "conversation.chat.failed"
ChatFlowRequiresAction ChatFlowEvent = "conversation.chat.requires_action"
ChatFlowError ChatFlowEvent = "error"
ChatFlowDone ChatFlowEvent = "done"
ChatFlowMessageDelta ChatFlowEvent = "conversation.message.delta"
ChatFlowMessageCompleted ChatFlowEvent = "conversation.message.completed"
)
type Usage struct {
TokenCount *int32 `form:"token_count" json:"token_count,omitempty"`
OutputTokens *int32 `form:"output_count" json:"output_count,omitempty"`
InputTokens *int32 `form:"input_count" json:"input_count,omitempty"`
}
type Status string
const (
Created Status = "created"
InProgress Status = "in_progress"
Completed Status = "completed"
Failed Status = "failed"
RequiresAction Status = "requires_action"
Canceled Status = "canceled"
)
type ChatFlowDetail struct {
ID string `json:"id,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
BotID string `json:"bot_id,omitempty"`
Status Status `json:"status,omitempty"`
Usage *Usage `json:"usage,omitempty"`
ExecuteID string `json:"execute_id,omitempty"`
}
type MessageDetail struct {
ID string `json:"id"`
ChatID string `json:"chat_id"`
ConversationID string `json:"conversation_id"`
BotID string `json:"bot_id"`
Role string `json:"role"`
Type string `json:"type"`
Content string `json:"content"`
ContentType string `json:"content_type"`
}
type ErrorDetail struct {
Code string `form:"code,required" json:"code,required"`
Msg string `form:"msg,required" json:"msg,required"`
DebugUrl string `form:"debug_url" json:"debug_url,omitempty"`
}

View File

@ -16,7 +16,10 @@
package vo
import "github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
import (
"github.com/cloudwego/eino/schema"
"github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
)
type ExecuteConfig struct {
ID int64
@ -37,6 +40,7 @@ type ExecuteConfig struct {
WorkflowMode WorkflowMode
RoundID *int64 // if workflow is chat flow, conversation round id is required
ConversationID *int64 // if workflow is chat flow, conversation id is required
UserMessage *schema.Message
}
type ExecuteMode string

View File

@ -68,6 +68,7 @@ type MetaUpdate struct {
IconURI *string
HasPublished *bool
LatestPublishedVersion *string
WorkflowMode *Mode
}
type MetaQuery struct {

View File

@ -65,6 +65,8 @@ type Service interface {
SyncRelatedWorkflowResources(ctx context.Context, appID int64, relatedWorkflows map[int64]entity.IDVersionPair, related vo.ExternalResourceRelated) error
ConversationService
BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) error
GetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error)
}
type Repository interface {

View File

@ -87,6 +87,7 @@ func init() {
_ = compose.RegisterSerializableType[vo.BizType]("biz_type")
_ = compose.RegisterSerializableType[*execute.AppVariables]("app_variables")
_ = compose.RegisterSerializableType[workflow2.WorkflowMode]("workflow_mode")
_ = compose.RegisterSerializableType[*schema.Message]("schema_message")
}

View File

@ -724,3 +724,60 @@ func (r *RepositoryImpl) UpdateDynamicConversationNameByID(ctx context.Context,
}
}
func (r *RepositoryImpl) UpdateStaticConversation(ctx context.Context, env vo.Env, templateID int64, connectorID int64, userID int64, newConversationID int64) error {
if env == vo.Draft {
appStaticConversationDraft := r.query.AppStaticConversationDraft
_, err := appStaticConversationDraft.WithContext(ctx).Where(
appStaticConversationDraft.TemplateID.Eq(templateID),
appStaticConversationDraft.ConnectorID.Eq(connectorID),
appStaticConversationDraft.UserID.Eq(userID),
).UpdateColumn(appStaticConversationDraft.ConversationID, newConversationID)
if err != nil {
return err
}
return err
} else if env == vo.Online {
appStaticConversationOnline := r.query.AppStaticConversationOnline
_, err := appStaticConversationOnline.WithContext(ctx).Where(
appStaticConversationOnline.TemplateID.Eq(templateID),
appStaticConversationOnline.ConnectorID.Eq(connectorID),
appStaticConversationOnline.UserID.Eq(userID),
).UpdateColumn(appStaticConversationOnline.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("unknown env %v", env)
}
}
func (r *RepositoryImpl) UpdateDynamicConversation(ctx context.Context, env vo.Env, conversationID, newConversationID int64) error {
if env == vo.Draft {
appDynamicConversationDraft := r.query.AppDynamicConversationDraft
_, err := appDynamicConversationDraft.WithContext(ctx).Where(appDynamicConversationDraft.ConversationID.Eq(conversationID)).
UpdateColumn(appDynamicConversationDraft.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else if env == vo.Online {
appDynamicConversationOnline := r.query.AppDynamicConversationOnline
_, err := appDynamicConversationOnline.WithContext(ctx).Where(appDynamicConversationOnline.ConversationID.Eq(conversationID)).
UpdateColumn(appDynamicConversationOnline.ConversationID, newConversationID)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("unknown env %v", env)
}
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"time"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
@ -38,6 +39,7 @@ const (
interruptEventListKeyPattern = "interrupt_event_list:%d"
interruptEventTTL = 24 * time.Hour // Example: expire after 24 hours
previousResumedEventKeyPattern = "previous_resumed_event:%d"
ConvToEventExecFormat = "conv_relate_info:%d"
)
// SaveInterruptEvents saves multiple interrupt events to the end of a Redis list.
@ -246,3 +248,33 @@ func (i *interruptEventStoreImpl) ListInterruptEvents(ctx context.Context, wfExe
return events, nil
}
func (i *interruptEventStoreImpl) BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) error {
data, err := sonic.Marshal(info)
if err != nil {
return err
}
result := i.redis.Set(ctx, fmt.Sprintf(ConvToEventExecFormat, convID), data, interruptEventTTL)
if result.Err() != nil {
return result.Err()
}
return nil
}
func (i *interruptEventStoreImpl) GetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error) {
data, err := i.redis.Get(ctx, fmt.Sprintf(ConvToEventExecFormat, convID)).Bytes()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, false, nil, nil
}
return nil, false, nil, err
}
rInfo := &entity.ConvRelatedInfo{}
err = sonic.UnmarshalString(string(data), rInfo)
if err != nil {
return nil, false, nil, err
}
return rInfo, true, func() error {
return i.redis.Del(ctx, fmt.Sprintf(ConvToEventExecFormat, convID)).Err()
}, nil
}

View File

@ -500,6 +500,10 @@ func (r *RepositoryImpl) UpdateMeta(ctx context.Context, id int64, metaUpdate *v
expressions = append(expressions, r.query.WorkflowMeta.LatestVersion.Value(*metaUpdate.LatestPublishedVersion))
}
if metaUpdate.WorkflowMode != nil {
expressions = append(expressions, r.query.WorkflowMeta.Mode.Value(int32(*metaUpdate.WorkflowMode)))
}
if len(expressions) == 0 {
return nil
}

View File

@ -22,6 +22,7 @@ import (
cloudworkflow "github.com/coze-dev/coze-studio/backend/api/model/ocean/cloud/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
@ -335,3 +336,91 @@ func (c *conversationImpl) replaceWorkflowsConversationName(ctx context.Context,
func (c *conversationImpl) DeleteDynamicConversation(ctx context.Context, env vo.Env, templateID int64) (int64, error) {
return c.repo.DeleteDynamicConversation(ctx, env, templateID)
}
func (c *conversationImpl) GetOrCreateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error) {
t, existed, err := c.repo.GetConversationTemplate(ctx, env, vo.GetConversationTemplatePolicy{
AppID: ptr.Of(appID),
Name: ptr.Of(conversationName),
})
if err != nil {
return 0, err
}
conversationIDGenerator := workflow.ConversationIDGenerator(func(ctx context.Context, appID int64, userID, connectorID int64) (int64, error) {
return conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
})
if existed {
conversationID, _, err := c.repo.GetOrCreateStaticConversation(ctx, env, conversationIDGenerator, &vo.CreateStaticConversation{
AppID: appID,
ConnectorID: connectorID,
UserID: userID,
TemplateID: t.TemplateID,
})
if err != nil {
return 0, err
}
return conversationID, nil
}
conversationID, _, err := c.repo.GetOrCreateDynamicConversation(ctx, env, conversationIDGenerator, &vo.CreateDynamicConversation{
AppID: appID,
ConnectorID: connectorID,
UserID: userID,
Name: conversationName,
})
if err != nil {
return 0, err
}
return conversationID, nil
}
func (c *conversationImpl) UpdateConversation(ctx context.Context, env vo.Env, appID, connectorID, userID int64, conversationName string) (int64, error) {
t, existed, err := c.repo.GetConversationTemplate(ctx, env, vo.GetConversationTemplatePolicy{
AppID: ptr.Of(appID),
Name: ptr.Of(conversationName),
})
if err != nil {
return 0, err
}
if existed {
newConversationID, err := conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
err = c.repo.UpdateStaticConversation(ctx, env, t.TemplateID, connectorID, userID, newConversationID)
if err != nil {
return 0, err
}
return newConversationID, nil
}
dy, existed, err := c.repo.GetDynamicConversationByName(ctx, env, appID, connectorID, userID, conversationName)
if err != nil {
return 0, err
}
if !existed {
return 0, fmt.Errorf("conversation name %v not found", conversationName)
}
newConversationID, err := conversation.GetConversationManager().CreateConversation(ctx, &conversation.CreateConversationRequest{
AppID: appID,
UserID: userID,
ConnectorID: connectorID,
})
err = c.repo.UpdateDynamicConversation(ctx, env, dy.ConversationID, newConversationID)
if err != nil {
return 0, err
}
return newConversationID, nil
}

View File

@ -380,6 +380,8 @@ func (i *impl) StreamExecute(ctx context.Context, config vo.ExecuteConfig, input
return nil, err
}
config.WorkflowMode = wfEntity.Mode
isApplicationWorkflow := wfEntity.AppID != nil
if isApplicationWorkflow && config.Mode == vo.ExecuteModeRelease {
err = i.checkApplicationWorkflowReleaseVersion(ctx, *wfEntity.AppID, config.ConnectorID, config.ID, config.Version)

View File

@ -851,6 +851,13 @@ func (i *impl) UpdateMeta(ctx context.Context, id int64, metaUpdate *vo.MetaUpda
return err
}
if metaUpdate.WorkflowMode != nil && *metaUpdate.WorkflowMode == cloudworkflow.WorkflowMode_ChatFlow {
err = i.adaptToChatFlow(ctx, id)
if err != nil {
return err
}
}
err = search.GetNotifier().PublishWorkflowResource(ctx, search.Updated, &search.Resource{
WorkflowID: id,
URI: metaUpdate.IconURI,
@ -1822,6 +1829,14 @@ func (i *impl) MGet(ctx context.Context, policy *vo.MGetPolicy) ([]*entity.Workf
}
}
func (i *impl) BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) error {
return i.repo.BindConvRelatedInfo(ctx, convID, info)
}
func (i *impl) GetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error) {
return i.repo.GetConvRelatedInfo(ctx, convID)
}
func (i *impl) calculateTestRunSuccess(ctx context.Context, c *vo.Canvas, wid int64) (bool, error) {
sc, err := adaptor.CanvasToWorkflowSchema(ctx, c)
if err != nil { // not even legal, test run can't possibly be successful
@ -2009,3 +2024,58 @@ func replaceRelatedWorkflowOrExternalResourceInWorkflowNodes(nodes []*vo.Node, r
func RegisterAllNodeAdaptors() {
adaptor.RegisterAllNodeAdaptors()
}
func (i *impl) adaptToChatFlow(ctx context.Context, wID int64) error {
wfEntity, err := i.repo.GetEntity(ctx, &vo.GetPolicy{
ID: wID,
QType: vo.FromDraft,
})
if err != nil {
return err
}
canvas := &vo.Canvas{}
err = sonic.UnmarshalString(wfEntity.Canvas, canvas)
if err != nil {
return err
}
var startNode *vo.Node
for _, node := range canvas.Nodes {
if node.Type == entity.NodeTypeEntry.IDStr() {
startNode = node
break
}
}
if startNode == nil {
return fmt.Errorf("can not find start node")
}
vMap := make(map[string]bool)
for _, o := range startNode.Data.Outputs {
v, err := vo.ParseVariable(o)
if err != nil {
return err
}
vMap[v.Name] = true
}
if _, ok := vMap["USER_INPUT"]; !ok {
startNode.Data.Outputs = append(startNode.Data.Outputs, &vo.Variable{
Name: "USER_INPUT",
Type: vo.VariableTypeString,
})
}
if _, ok := vMap["CONVERSATION_NAME"]; !ok {
startNode.Data.Outputs = append(startNode.Data.Outputs, &vo.Variable{
Name: "CONVERSATION_NAME",
Type: vo.VariableTypeString,
DefaultValue: "Default",
})
}
canvasStr, err := sonic.MarshalString(canvas)
if err != nil {
return err
}
return i.Save(ctx, wID, canvasStr)
}

View File

@ -2200,3 +2200,34 @@ struct OpenAPIGetWorkflowInfoResponse{
255: required base.BaseResp BaseResp
}
struct CreateConversationRequest {
1: optional map<string,string> MetaData (api.body = "meta_data") //自定义透传字段
3: optional i64 BotId (api.body = "bot_id", api.js_conv="true")
4: optional i64 ConnectorId (api.body= "connector_id", api.js_conv="true")
5: optional string SpaceID (api.body= "space_id", api.js_conv="true")
9 : optional string AppID (go.tag="json:\"app_id\"")
10: optional string WorkflowID (go.tag="json:\"workflow_id\"")
11: optional string ConversationMame (go.tag="json:\"conversation_name\"")
12: optional bool GetOrCreate (go.tag="json:\"get_or_create\"")
13: optional bool DraftMode (go.tag="json:\"draft_mode\"")
255: optional base.Base Base
}
struct CreateConversationResponse {
1: i64 code
2: string msg
3: optional ConversationData ConversationData (api.body = "data")
}
struct ConversationData {
1: i64 Id (api.body = "id", agw.key = "id", api.js_conv="true")
2: i64 CreatedAt (api.body = "created_at", agw.key = "created_at")
3: map<string,string> MetaData (api.body = "meta_data", agw.key = "meta_data")
4: optional i64 CreatorID (api.body = "creator_d", agw.key = "creator_d", api.js_conv="true")
5: optional i64 ConnectorID (api.body = "connector_id", agw.key="connector_id", api.js_conv="true")
6: optional i64 LastSectionID (api.body="last_section_id", api.js_conv="true")
7: optional i64 AccountID (api.body = "account_id")
}

View File

@ -66,6 +66,7 @@ service WorkflowService {
workflow.CreateChatFlowRoleResponse CreateChatFlowRole(1: workflow.CreateChatFlowRoleRequest request) (api.post='/api/workflow_api/chat_flow_role/create', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
workflow.DeleteChatFlowRoleResponse DeleteChatFlowRole(1: workflow.DeleteChatFlowRoleRequest request) (api.post='/api/workflow_api/chat_flow_role/delete', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
// App Release Management
// App 发布管理
workflow.ListPublishWorkflowResponse ListPublishWorkflow(1: workflow.ListPublishWorkflowRequest request) (api.post='/api/workflow_api/list_publish_workflow', api.category="workflow_api", api.gen_path="workflow_api", agw.preserve_base = "true")
// Open API
@ -75,4 +76,7 @@ service WorkflowService {
workflow.GetWorkflowRunHistoryResponse OpenAPIGetWorkflowRunHistory(1:workflow.GetWorkflowRunHistoryRequest request)(api.get='/v1/workflow/get_run_history', api.category="workflow_open_api", api.tag="openapi", api.gen_path="workflow_api", agw.preserve_base = "false")
workflow.ChatFlowRunResponse OpenAPIChatFlowRun(1: workflow.ChatFlowRunRequest request)(api.post = "/v1/workflows/chat", api.category="workflow_open_api", api.tag="openapi", api.gen_path="workflow_open_api")
workflow.OpenAPIGetWorkflowInfoResponse OpenAPIGetWorkflowInfo(1: workflow.OpenAPIGetWorkflowInfoRequest request)(api.get = "/v1/workflows/:workflow_id", api.category="workflow_open_api", api.tag="openapi", api.gen_path="workflow_open_api")
workflow.CreateConversationRequest OpenAPICreateConversation(1: workflow.CreateConversationRequest request)(api.post = "/v1/workflow/conversation/create", api.category="workflow_open_api",api.tag="openapi", api.gen_path="workflow_open_api", agw.preserve_base = "true")
}

18
scripts/setup/python.sh Executable file → Normal file
View File

@ -1,4 +1,20 @@
#!/usr/bin/env bash
#
# Copyright 2025 coze-dev Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
SETUP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SCRIPT_DIR="$(dirname "$SETUP_DIR")"
@ -64,4 +80,4 @@ if [ -f "$WORKFLOW_SANBOX" ]; then
else
echo "$WORKFLOW_SANBOX file not found"
exit 1
fi
fi