feat: support openapi chat with non-stream (#2431)

This commit is contained in:
junwen-lee
2025-11-04 14:33:02 +08:00
committed by GitHub
parent 16396f5693
commit 2eb29dab70
16 changed files with 3274 additions and 26 deletions

View File

@ -28,11 +28,12 @@ import (
"github.com/coze-dev/coze-studio/backend/api/model/app/bot_common"
"github.com/coze-dev/coze-studio/backend/api/model/conversation/common"
"github.com/coze-dev/coze-studio/backend/api/model/conversation/message"
"github.com/coze-dev/coze-studio/backend/api/model/conversation/run"
"github.com/coze-dev/coze-studio/backend/application/base/ctxutil"
singleagent "github.com/coze-dev/coze-studio/backend/crossdomain/agent/model"
agentrun "github.com/coze-dev/coze-studio/backend/crossdomain/agentrun/model"
message "github.com/coze-dev/coze-studio/backend/crossdomain/message/model"
crossmessage "github.com/coze-dev/coze-studio/backend/crossdomain/message/model"
saEntity "github.com/coze-dev/coze-studio/backend/domain/agent/singleagent/entity"
"github.com/coze-dev/coze-studio/backend/domain/conversation/agentrun/entity"
convEntity "github.com/coze-dev/coze-studio/backend/domain/conversation/conversation/entity"
@ -228,10 +229,10 @@ func (a *OpenapiAgentRunApplication) buildDisplayContent(_ context.Context, ar *
return ""
}
func (a *OpenapiAgentRunApplication) parseQueryContent(ctx context.Context, multiAdditionalMessages []*entity.AdditionalMessage) ([]*entity.AdditionalMessage, []*message.InputMetaData, message.ContentType, error) {
func (a *OpenapiAgentRunApplication) parseQueryContent(ctx context.Context, multiAdditionalMessages []*entity.AdditionalMessage) ([]*entity.AdditionalMessage, []*crossmessage.InputMetaData, crossmessage.ContentType, error) {
var multiContent []*message.InputMetaData
var contentType message.ContentType
var multiContent []*crossmessage.InputMetaData
var contentType crossmessage.ContentType
var filterMultiAdditionalMessages []*entity.AdditionalMessage
filterMultiAdditionalMessages = multiAdditionalMessages
@ -258,7 +259,7 @@ func (a *OpenapiAgentRunApplication) parseAdditionalMessages(ctx context.Context
if item.Role != string(schema.User) && item.Role != string(schema.Assistant) {
return nil, errors.New("additional message role only support user and assistant")
}
if item.Type != nil && !slices.Contains([]message.MessageType{message.MessageTypeQuestion, message.MessageTypeAnswer}, message.MessageType(*item.Type)) {
if item.Type != nil && !slices.Contains([]crossmessage.MessageType{crossmessage.MessageTypeQuestion, crossmessage.MessageTypeAnswer}, crossmessage.MessageType(*item.Type)) {
return nil, errors.New("additional message type only support question and answer now")
}
@ -266,9 +267,9 @@ func (a *OpenapiAgentRunApplication) parseAdditionalMessages(ctx context.Context
Role: schema.RoleType(item.Role),
}
if item.Type != nil {
addOne.Type = message.MessageType(*item.Type)
addOne.Type = crossmessage.MessageType(*item.Type)
} else {
addOne.Type = message.MessageTypeQuestion
addOne.Type = crossmessage.MessageTypeQuestion
}
if item.ContentType == run.ContentTypeText {
@ -276,20 +277,20 @@ func (a *OpenapiAgentRunApplication) parseAdditionalMessages(ctx context.Context
continue
}
addOne.ContentType = message.ContentTypeText
addOne.Content = []*message.InputMetaData{{
Type: message.InputTypeText,
addOne.ContentType = crossmessage.ContentTypeText
addOne.Content = []*crossmessage.InputMetaData{{
Type: crossmessage.InputTypeText,
Text: item.Content,
}}
}
if item.ContentType == run.ContentTypeMixApi {
if ptr.From(item.Type) == string(message.MessageTypeAnswer) {
if ptr.From(item.Type) == string(crossmessage.MessageTypeAnswer) {
return nil, errors.New(" answer messages only support text content")
}
addOne.ContentType = message.ContentTypeMix
addOne.ContentType = crossmessage.ContentTypeMix
var inputs []*run.AdditionalContent
err := json.Unmarshal([]byte(item.Content), &inputs)
@ -301,14 +302,14 @@ func (a *OpenapiAgentRunApplication) parseAdditionalMessages(ctx context.Context
if one == nil {
continue
}
switch message.InputType(one.Type) {
case message.InputTypeText:
switch crossmessage.InputType(one.Type) {
case crossmessage.InputTypeText:
addOne.Content = append(addOne.Content, &message.InputMetaData{
Type: message.InputTypeText,
addOne.Content = append(addOne.Content, &crossmessage.InputMetaData{
Type: crossmessage.InputTypeText,
Text: ptr.From(one.Text),
})
case message.InputTypeImage, message.InputTypeFile:
case crossmessage.InputTypeImage, crossmessage.InputTypeFile:
var fileUrl, fileURI string
if one.GetFileURL() != "" {
@ -323,9 +324,9 @@ func (a *OpenapiAgentRunApplication) parseAdditionalMessages(ctx context.Context
fileUrl = fileInfo.File.Url
fileURI = fileInfo.File.TosURI
}
addOne.Content = append(addOne.Content, &message.InputMetaData{
Type: message.InputType(one.Type),
FileData: []*message.FileData{
addOne.Content = append(addOne.Content, &crossmessage.InputMetaData{
Type: crossmessage.InputType(one.Type),
FileData: []*crossmessage.FileData{
{
Url: fileUrl,
URI: fileURI,
@ -417,6 +418,212 @@ func buildARSM2ApiChatMessage(chunk *entity.AgentRunResponse) []byte {
return mCM
}
func (a *OpenapiAgentRunApplication) RetrieveRunRecord(ctx context.Context, req *run.RetrieveChatOpenRequest) (*run.RetrieveChatOpenResponse, error) {
resp := new(run.RetrieveChatOpenResponse)
apiKeyInfo := ctxutil.GetApiAuthFromCtx(ctx)
userID := apiKeyInfo.UserID
// Get agent run record by chat ID
runRecord, err := ConversationSVC.AgentRunDomainSVC.GetByID(ctx, req.ChatID)
if err != nil {
return nil, err
}
if runRecord == nil {
return nil, errorx.New(errno.ErrRecordNotFound)
}
// Get conversation data and check permissions
conversationData, err := ConversationSVC.ConversationDomainSVC.GetByID(ctx, req.ConversationID)
if err != nil {
return nil, err
}
if userID != conversationData.CreatorID {
return nil, errorx.New(errno.ErrConversationPermissionCode, errorx.KV("msg", "user not match"))
}
// Build response with chat detail
resp.ChatDetail = &run.ChatV3ChatDetail{
ID: runRecord.ID,
ConversationID: runRecord.ConversationID,
BotID: runRecord.AgentID,
Status: string(runRecord.Status),
SectionID: ptr.Of(runRecord.SectionID),
CreatedAt: ptr.Of(int32(runRecord.CreatedAt / 1000)),
CompletedAt: ptr.Of(int32(runRecord.CompletedAt / 1000)),
FailedAt: ptr.Of(int32(runRecord.FailedAt / 1000)),
}
// Add usage information if available
if runRecord.Usage != nil {
resp.ChatDetail.Usage = &run.Usage{
TokenCount: ptr.Of(int32(runRecord.Usage.LlmTotalTokens)),
InputTokens: ptr.Of(int32(runRecord.Usage.LlmPromptTokens)),
OutputTokens: ptr.Of(int32(runRecord.Usage.LlmCompletionTokens)),
}
}
return resp, nil
}
func (a *OpenapiAgentRunApplication) ListChatMessageApi(ctx context.Context, req *message.ListChatMessageApiRequest) (*message.ListChatMessageApiResponse, error) {
resp := new(message.ListChatMessageApiResponse)
apiKeyInfo := ctxutil.GetApiAuthFromCtx(ctx)
userID := apiKeyInfo.UserID
// Get conversation data and check permissions
conversationData, err := ConversationSVC.ConversationDomainSVC.GetByID(ctx, req.ConversationID)
if err != nil {
return nil, err
}
if userID != conversationData.CreatorID {
return nil, errorx.New(errno.ErrConversationPermissionCode, errorx.KV("msg", "user not match"))
}
// Get messages by run IDs
messages, err := ConversationSVC.MessageDomainSVC.GetByRunIDs(ctx, req.ConversationID, []int64{req.ChatID})
if err != nil {
return nil, err
}
// Convert domain messages to API messages
var apiMessages []*message.ChatV3MessageDetail
for _, msg := range messages {
apiMessage := &message.ChatV3MessageDetail{
ID: msg.ID,
ConversationID: msg.ConversationID,
BotID: msg.AgentID,
Role: string(msg.Role),
Type: string(msg.MessageType),
Content: msg.Content,
ContentType: string(msg.ContentType),
MetaData: msg.Ext,
ChatID: msg.RunID,
ReasoningContent: ptr.Of(msg.ReasoningContent),
CreatedAt: ptr.Of(msg.CreatedAt / 1000),
SectionID: ptr.Of(msg.SectionID),
}
apiMessages = append(apiMessages, apiMessage)
}
resp.Messages = apiMessages
return resp, nil
}
func (a *OpenapiAgentRunApplication) OpenapiAgentRunSync(ctx context.Context, ar *run.ChatV3Request) (*run.RetrieveChatOpenResponse, error) {
apiKeyInfo := ctxutil.GetApiAuthFromCtx(ctx)
creatorID := apiKeyInfo.UserID
connectorID := apiKeyInfo.ConnectorID
if ptr.From(ar.ConnectorID) == consts.WebSDKConnectorID {
connectorID = ptr.From(ar.ConnectorID)
}
agentInfo, caErr := a.checkAgent(ctx, ar, connectorID)
if caErr != nil {
logs.CtxErrorf(ctx, "checkAgent err:%v", caErr)
return nil, caErr
}
conversationData, ccErr := a.checkConversation(ctx, ar, creatorID, connectorID)
if ccErr != nil {
logs.CtxErrorf(ctx, "checkConversation err:%v", ccErr)
return nil, ccErr
}
spaceID := agentInfo.SpaceID
arr, err := a.buildAgentRunRequest(ctx, ar, connectorID, spaceID, conversationData)
if err != nil {
logs.CtxErrorf(ctx, "buildAgentRunRequest err:%v", err)
return nil, err
}
// Execute agent run synchronously
streamer, err := ConversationSVC.AgentRunDomainSVC.AgentRun(ctx, arr)
if err != nil {
return nil, err
}
var finalChatDetail *run.ChatV3ChatDetail
var backgroundProcessingStarted bool
startBackgroundProcessing := func() {
if !backgroundProcessingStarted {
backgroundProcessingStarted = true
go func() {
defer func() {
if r := recover(); r != nil {
logs.CtxErrorf(ctx, "background stream processing panic: %v", r)
}
}()
for {
_, recvErr := streamer.Recv()
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
logs.CtxInfof(ctx, "background stream processing completed")
} else {
logs.CtxErrorf(ctx, "background stream processing error: %v", recvErr)
}
break
}
}
}()
}
}
for {
chunk, recvErr := streamer.Recv()
logs.CtxInfof(ctx, "chunk :%v, err:%v", conv.DebugJsonToStr(chunk), recvErr)
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
break
}
return nil, errorx.New(errno.ErrConversationAgentRunError, errorx.KV("msg", recvErr.Error()))
}
switch chunk.Event {
case entity.RunEventError:
return nil, errorx.New(int32(chunk.Error.Code), errorx.KV("msg", chunk.Error.Msg))
case entity.RunEventCreated:
chunkRunItem := chunk.ChunkRunItem
finalChatDetail = &run.ChatV3ChatDetail{
ID: chunkRunItem.ID,
ConversationID: chunkRunItem.ConversationID,
BotID: chunkRunItem.AgentID,
Status: string(chunkRunItem.Status),
SectionID: ptr.Of(chunkRunItem.SectionID),
CreatedAt: ptr.Of(int32(chunkRunItem.CreatedAt / 1000)),
CompletedAt: ptr.Of(int32(chunkRunItem.CompletedAt / 1000)),
FailedAt: ptr.Of(int32(chunkRunItem.FailedAt / 1000)),
}
if chunkRunItem.Usage != nil {
finalChatDetail.Usage = &run.Usage{
TokenCount: ptr.Of(int32(chunkRunItem.Usage.LlmTotalTokens)),
InputTokens: ptr.Of(int32(chunkRunItem.Usage.LlmPromptTokens)),
OutputTokens: ptr.Of(int32(chunkRunItem.Usage.LlmCompletionTokens)),
}
}
startBackgroundProcessing()
goto exitLoop
default:
logs.CtxInfof(ctx, "received event: %v, starting background processing and continuing to listen", chunk.Event)
startBackgroundProcessing()
}
}
exitLoop:
if finalChatDetail == nil {
return nil, errorx.New(errno.ErrConversationAgentRunError, errorx.KV("msg", "no final result received"))
}
resp := &run.RetrieveChatOpenResponse{
ChatDetail: finalChatDetail,
}
return resp, nil
}
func (a *OpenapiAgentRunApplication) CancelRun(ctx context.Context, req *run.CancelChatApiRequest) (*run.CancelChatApiResponse, error) {
resp := new(run.CancelChatApiResponse)

View File

@ -19,15 +19,19 @@ package conversation
import (
"context"
"errors"
"io"
"testing"
"github.com/cloudwego/eino/schema"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"github.com/coze-dev/coze-studio/backend/api/model/conversation/common"
"github.com/coze-dev/coze-studio/backend/api/model/conversation/run"
singleagent "github.com/coze-dev/coze-studio/backend/crossdomain/agent/model"
agentrun "github.com/coze-dev/coze-studio/backend/crossdomain/agentrun/model"
saEntity "github.com/coze-dev/coze-studio/backend/domain/agent/singleagent/entity"
"github.com/coze-dev/coze-studio/backend/domain/conversation/agentrun/entity"
convEntity "github.com/coze-dev/coze-studio/backend/domain/conversation/conversation/entity"
openapiEntity "github.com/coze-dev/coze-studio/backend/domain/openauth/openapiauth/entity"
cmdEntity "github.com/coze-dev/coze-studio/backend/domain/shortcutcmd/entity"
@ -899,3 +903,274 @@ func TestOpenapiAgentRun_ParseAdditionalMessages_NilMessage(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "mock stream error")
}
type MockStreamReader struct {
chunks []*entity.AgentRunResponse
index int
}
func (m *MockStreamReader) Recv() (*entity.AgentRunResponse, error) {
if m.index >= len(m.chunks) {
return nil, io.EOF
}
response := m.chunks[m.index]
m.index++
return response, nil
}
func newMockStreamReader(chunks []*entity.AgentRunResponse) *schema.StreamReader[*entity.AgentRunResponse] {
sr, sw := schema.Pipe[*entity.AgentRunResponse](10)
go func() {
defer sw.Close()
for _, chunk := range chunks {
sw.Send(chunk, nil)
}
}()
return sr
}
func TestOpenapiAgentRunSync_Success(t *testing.T) {
app, _, _, mockAgentRun, mockConversation, mockSingleAgent, _ := setupMocks(t)
ctx := createTestContext()
req := &run.ChatV3Request{
BotID: 67890,
ConversationID: ptr.Of(int64(11111)),
User: "test-user",
AdditionalMessages: []*run.EnterMessage{
{
Role: "user",
Content: "test query",
ContentType: run.ContentTypeText,
},
},
}
// Mock agent check success
mockAgent := &saEntity.SingleAgent{
SingleAgent: &singleagent.SingleAgent{
AgentID: 67890,
SpaceID: 54321,
},
}
mockSingleAgent.EXPECT().ObtainAgentByIdentity(ctx, gomock.Any()).Return(mockAgent, nil)
// Mock conversation check success
mockConv := &convEntity.Conversation{
ID: 11111,
CreatorID: 12345,
SectionID: 98765,
}
mockConversation.EXPECT().GetByID(ctx, int64(11111)).Return(mockConv, nil)
// Mock successful agent run with stream
mockStream := newMockStreamReader([]*entity.AgentRunResponse{
{
Event: entity.RunEventCreated,
ChunkRunItem: &entity.RunRecordMeta{
ID: 999,
ConversationID: 11111,
AgentID: 67890,
Status: entity.RunStatusCompleted,
SectionID: 98765,
CreatedAt: 1640995200000, // 2022-01-01 00:00:00
CompletedAt: 1640995260000, // 2022-01-01 00:01:00
Usage: &agentrun.Usage{
LlmTotalTokens: 100,
LlmPromptTokens: 60,
LlmCompletionTokens: 40,
},
},
},
})
mockAgentRun.EXPECT().AgentRun(ctx, gomock.Any()).Return(mockStream, nil)
result, err := app.OpenapiAgentRunSync(ctx, req)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.NotNil(t, result.ChatDetail)
assert.Equal(t, int64(999), result.ChatDetail.ID)
assert.Equal(t, int64(11111), result.ChatDetail.ConversationID)
assert.Equal(t, int64(67890), result.ChatDetail.BotID)
assert.Equal(t, string(entity.RunStatusCompleted), result.ChatDetail.Status)
assert.Equal(t, ptr.Of(int64(98765)), result.ChatDetail.SectionID)
assert.Equal(t, ptr.Of(int32(1640995200)), result.ChatDetail.CreatedAt)
assert.Equal(t, ptr.Of(int32(1640995260)), result.ChatDetail.CompletedAt)
assert.NotNil(t, result.ChatDetail.Usage)
assert.Equal(t, ptr.Of(int32(100)), result.ChatDetail.Usage.TokenCount)
assert.Equal(t, ptr.Of(int32(60)), result.ChatDetail.Usage.InputTokens)
assert.Equal(t, ptr.Of(int32(40)), result.ChatDetail.Usage.OutputTokens)
}
func TestOpenapiAgentRunSync_AgentNotFound(t *testing.T) {
app, _, _, _, _, mockSingleAgent, _ := setupMocks(t)
ctx := createTestContext()
req := &run.ChatV3Request{
BotID: 67890,
AdditionalMessages: []*run.EnterMessage{
{
Role: "user",
Content: "test query",
ContentType: run.ContentTypeText,
},
},
}
// Mock agent check failure
mockSingleAgent.EXPECT().ObtainAgentByIdentity(ctx, gomock.Any()).Return(nil, nil)
result, err := app.OpenapiAgentRunSync(ctx, req)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "agent not exists")
}
func TestOpenapiAgentRunSync_ConversationPermissionError(t *testing.T) {
app, _, _, _, mockConversation, mockSingleAgent, _ := setupMocks(t)
ctx := createTestContext()
req := &run.ChatV3Request{
BotID: 67890,
ConversationID: ptr.Of(int64(11111)),
AdditionalMessages: []*run.EnterMessage{
{
Role: "user",
Content: "test query",
ContentType: run.ContentTypeText,
},
},
}
// Mock agent check success
mockAgent := &saEntity.SingleAgent{
SingleAgent: &singleagent.SingleAgent{
AgentID: 67890,
SpaceID: 54321,
},
}
mockSingleAgent.EXPECT().ObtainAgentByIdentity(ctx, gomock.Any()).Return(mockAgent, nil)
// Mock conversation check with wrong user
mockConv := &convEntity.Conversation{
ID: 11111,
CreatorID: 99999, // Different user ID
SectionID: 98765,
}
mockConversation.EXPECT().GetByID(ctx, int64(11111)).Return(mockConv, nil)
result, err := app.OpenapiAgentRunSync(ctx, req)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "user not match")
}
func TestOpenapiAgentRunSync_StreamError(t *testing.T) {
app, _, _, mockAgentRun, mockConversation, mockSingleAgent, _ := setupMocks(t)
ctx := createTestContext()
req := &run.ChatV3Request{
BotID: 67890,
ConversationID: ptr.Of(int64(11111)),
User: "test-user",
AdditionalMessages: []*run.EnterMessage{
{
Role: "user",
Content: "test query",
ContentType: run.ContentTypeText,
},
},
}
// Mock agent check success
mockAgent := &saEntity.SingleAgent{
SingleAgent: &singleagent.SingleAgent{
AgentID: 67890,
SpaceID: 54321,
},
}
mockSingleAgent.EXPECT().ObtainAgentByIdentity(ctx, gomock.Any()).Return(mockAgent, nil)
// Mock conversation check success
mockConv := &convEntity.Conversation{
ID: 11111,
CreatorID: 12345,
SectionID: 98765,
}
mockConversation.EXPECT().GetByID(ctx, int64(11111)).Return(mockConv, nil)
// Mock stream with error event
mockStream := newMockStreamReader([]*entity.AgentRunResponse{
{
Event: entity.RunEventError,
Error: &entity.RunError{
Code: 500,
Msg: "agent run failed",
},
},
})
mockAgentRun.EXPECT().AgentRun(ctx, gomock.Any()).Return(mockStream, nil)
result, err := app.OpenapiAgentRunSync(ctx, req)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "code=500")
}
func TestOpenapiAgentRunSync_NoFinalResult(t *testing.T) {
app, _, _, mockAgentRun, mockConversation, mockSingleAgent, _ := setupMocks(t)
ctx := createTestContext()
req := &run.ChatV3Request{
BotID: 67890,
ConversationID: ptr.Of(int64(11111)),
User: "test-user",
AdditionalMessages: []*run.EnterMessage{
{
Role: "user",
Content: "test query",
ContentType: run.ContentTypeText,
},
},
}
// Mock agent check success
mockAgent := &saEntity.SingleAgent{
SingleAgent: &singleagent.SingleAgent{
AgentID: 67890,
SpaceID: 54321,
},
}
mockSingleAgent.EXPECT().ObtainAgentByIdentity(ctx, gomock.Any()).Return(mockAgent, nil)
// Mock conversation check success
mockConv := &convEntity.Conversation{
ID: 11111,
CreatorID: 12345,
SectionID: 98765,
}
mockConversation.EXPECT().GetByID(ctx, int64(11111)).Return(mockConv, nil)
// Mock stream with no RunEventCreated event
mockStream := newMockStreamReader([]*entity.AgentRunResponse{
{
Event: entity.RunEventMessageDelta, // Different event type
},
})
mockAgentRun.EXPECT().AgentRun(ctx, gomock.Any()).Return(mockStream, nil)
result, err := app.OpenapiAgentRunSync(ctx, req)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "no final result received")
}