fix: interact with the RunRecord table in chatflow run and message nodes

This commit is contained in:
lvxinyu.1117
2025-08-14 17:46:56 +08:00
parent dbcaba309c
commit 9b0998a939
8 changed files with 119 additions and 53 deletions

View File

@ -24,6 +24,7 @@ import (
"strconv"
"strings"
crossagentrun "github.com/coze-dev/coze-studio/backend/crossdomain/contract/agentrun"
crossconversation "github.com/coze-dev/coze-studio/backend/crossdomain/contract/conversation"
crossmessage "github.com/coze-dev/coze-studio/backend/crossdomain/contract/message"
@ -31,6 +32,7 @@ import (
"github.com/coze-dev/coze-studio/backend/api/model/crossdomain/message"
"github.com/coze-dev/coze-studio/backend/api/model/workflow"
"github.com/coze-dev/coze-studio/backend/application/base/ctxutil"
agententity "github.com/coze-dev/coze-studio/backend/domain/conversation/agentrun/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
@ -560,11 +562,19 @@ func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workfl
sectionID = sID
}
roundID, err := w.IDGenerator.GenID(ctx)
runRecord, err := crossagentrun.DefaultSVC().Create(ctx, &agententity.AgentRunMeta{
AgentID: resolveAppID,
ConversationID: conversationID,
UserID: strconv.FormatInt(userID, 10),
ConnectorID: connectorID,
SectionID: sectionID,
})
if err != nil {
return nil, vo.WrapError(errno.ErrIDGenError, err)
return nil, err
}
roundID := runRecord.ID
userMessage, err := toConversationMessage(ctx, resolveAppID, conversationID, userID, roundID, sectionID, message.MessageTypeQuestion, lastUserMessage)
if err != nil {
return nil, err

View File

@ -25,6 +25,7 @@ import (
type AgentRun interface {
Delete(ctx context.Context, runID []int64) error
List(ctx context.Context, ListMeta *entity.ListRunRecordMeta) ([]*entity.RunRecordMeta, error)
Create(ctx context.Context, runRecord *entity.AgentRunMeta) (*entity.RunRecordMeta, error)
}
var defaultSVC AgentRun

View File

@ -49,3 +49,7 @@ func (c *impl) Delete(ctx context.Context, runID []int64) error {
func (c *impl) List(ctx context.Context, meta *entity.ListRunRecordMeta) ([]*entity.RunRecordMeta, error) {
return c.DomainSVC.List(ctx, meta)
}
func (c *impl) Create(ctx context.Context, meta *entity.AgentRunMeta) (*entity.RunRecordMeta, error) {
return c.DomainSVC.Create(ctx, meta)
}

View File

@ -20,8 +20,11 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync/atomic"
crossagentrun "github.com/coze-dev/coze-studio/backend/crossdomain/contract/agentrun"
agententity "github.com/coze-dev/coze-studio/backend/domain/conversation/agentrun/entity"
"github.com/coze-dev/coze-studio/backend/domain/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/crossdomain/conversation"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
@ -180,13 +183,34 @@ func (c *CreateMessage) Invoke(ctx context.Context, input map[string]any) (map[s
currentConversationID := execCtx.ExeCfg.ConversationID
isCurrentConversation := currentConversationID != nil && *currentConversationID == conversationID
var runID int64
if role == "user" {
// For user messages, always create a new run and store the ID in the context.
newRunID, err := workflow.GetRepository().GenID(ctx)
var sectionID int64
if isCurrentConversation {
if execCtx.ExeCfg.SectionID != nil {
sectionID = *execCtx.ExeCfg.SectionID
} else {
return nil, vo.WrapError(errno.ErrInvalidParameter, errors.New("section id is required"))
}
} else {
cInfo, err := c.Creator.GetByID(ctx, conversationID)
if err != nil {
return nil, err
}
sectionID = cInfo.SectionID
}
if role == "user" {
// For user messages, always create a new run and store the ID in the context.
runRecord, err := crossagentrun.DefaultSVC().Create(ctx, &agententity.AgentRunMeta{
AgentID: resolvedAppID,
ConversationID: conversationID,
UserID: strconv.FormatInt(userID, 10),
ConnectorID: connectorID,
SectionID: sectionID,
})
if err != nil {
return nil, err
}
newRunID := runRecord.ID
if execCtx.ExeCfg.RoundID != nil {
atomic.StoreInt64(execCtx.ExeCfg.RoundID, newRunID)
}
@ -221,29 +245,20 @@ func (c *CreateMessage) Invoke(ctx context.Context, input map[string]any) (map[s
if len(runIDs) > 0 && runIDs[0] != 0 {
runID = runIDs[0]
} else {
newRunID, err := workflow.GetRepository().GenID(ctx)
runRecord, err := crossagentrun.DefaultSVC().Create(ctx, &agententity.AgentRunMeta{
AgentID: resolvedAppID,
ConversationID: conversationID,
UserID: strconv.FormatInt(userID, 10),
ConnectorID: connectorID,
SectionID: sectionID,
})
if err != nil {
return nil, err
}
runID = newRunID
runID = runRecord.ID
}
}
var sectionID int64
if isCurrentConversation {
if execCtx.ExeCfg.SectionID != nil {
sectionID = *execCtx.ExeCfg.SectionID
} else {
return nil, vo.WrapError(errno.ErrInvalidParameter, errors.New("section id is required"))
}
} else {
cInfo, err := c.Creator.GetByID(ctx, conversationID)
if err != nil {
return nil, err
}
sectionID = cInfo.SectionID
}
mID, err := c.Creator.CreateMessage(ctx, &conversation.CreateMessageRequest{
ConversationID: conversationID,
Role: role,

View File

@ -349,20 +349,30 @@ func (id *IntentDetector) ToCallbackInput(ctx context.Context, in map[string]any
}
if len(messages) == 0 {
if id.ChatHistorySetting.EnableChatHistory {
ret := map[string]any{
"chatHistory": []any{},
"query": in["query"],
}
return ret, nil
}
return in, nil
}
count := 0
endIdx := 0
var historyMessages []any
for _, msg := range messages {
if count > int(id.ChatHistorySetting.ChatHistoryRound) {
break
}
if msg.Role == schema.User {
startIdx := 0
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == schema.User {
count++
}
endIdx++
if count >= int(id.ChatHistorySetting.ChatHistoryRound) {
startIdx = i
break
}
}
var historyMessages []any
for _, msg := range messages[startIdx:] {
content, err := nodesconversation.ConvertMessageToString(ctx, msg)
if err != nil {
logs.CtxWarnf(ctx, "failed to convert message to string: %v", err)
@ -373,7 +383,7 @@ func (id *IntentDetector) ToCallbackInput(ctx context.Context, in map[string]any
"content": content,
})
}
ctxcache.Store(ctx, chatHistoryKey, messages[:endIdx])
ctxcache.Store(ctx, chatHistoryKey, messages[startIdx:])
ret := map[string]any{
"chatHistory": historyMessages,

View File

@ -258,20 +258,30 @@ func (kr *Retrieve) ToCallbackInput(ctx context.Context, in map[string]any) (map
messages = execCtx.ExeCfg.ConversationHistory
}
if len(messages) == 0 {
if kr.ChatHistorySetting.EnableChatHistory {
ret := map[string]any{
"chatHistory": []any{},
"Query": in["Query"],
}
return ret, nil
}
return in, nil
}
count := 0
endIdx := 0
var historyMessages []any
for _, msg := range messages {
if count > int(kr.ChatHistorySetting.ChatHistoryRound) {
break
}
if msg.Role == einoSchema.User {
startIdx := 0
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == einoSchema.User {
count++
}
endIdx++
if count >= int(kr.ChatHistorySetting.ChatHistoryRound) {
startIdx = i
break
}
}
var historyMessages []any
for _, msg := range messages[startIdx:] {
content, err := nodesconversation.ConvertMessageToString(ctx, msg)
if err != nil {
logs.CtxWarnf(ctx, "failed to convert message to string: %v", err)
@ -282,7 +292,7 @@ func (kr *Retrieve) ToCallbackInput(ctx context.Context, in map[string]any) (map
"content": content,
})
}
ctxcache.Store(ctx, chatHistoryKey, messages[:endIdx])
ctxcache.Store(ctx, chatHistoryKey, messages[startIdx:])
ret := map[string]any{
"chatHistory": historyMessages,

View File

@ -1244,20 +1244,36 @@ func (l *LLM) ToCallbackInput(ctx context.Context, input map[string]any) (map[st
messages = execCtx.ExeCfg.ConversationHistory
}
if len(messages) == 0 {
if l.chatHistorySetting.EnableChatHistory {
ret := map[string]any{
"chatHistory": []any{},
}
for k, v := range input {
ret[k] = v
}
return ret, nil
}
return input, nil
}
maxRounds := int(l.chatHistorySetting.ChatHistoryRound)
if execCtx != nil && execCtx.ExeCfg.MaxHistoryRounds != nil {
maxRounds = min(int(*execCtx.ExeCfg.MaxHistoryRounds), maxRounds)
}
count := 0
endIdx := 0
var historyMessages []any
for _, msg := range messages {
if count > int(l.chatHistorySetting.ChatHistoryRound) {
break
}
if msg.Role == schema.User {
startIdx := 0
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == schema.User {
count++
}
endIdx++
if count >= maxRounds {
startIdx = i
break
}
}
var historyMessages []any
for _, msg := range messages[startIdx:] {
content, err := nodesconversation.ConvertMessageToString(ctx, msg)
if err != nil {
logs.CtxWarnf(ctx, "failed to convert message to string: %v", err)
@ -1268,7 +1284,7 @@ func (l *LLM) ToCallbackInput(ctx context.Context, input map[string]any) (map[st
"content": content,
})
}
ctxcache.Store(ctx, chatHistoryKey, messages[:endIdx])
ctxcache.Store(ctx, chatHistoryKey, messages[startIdx:])
ret := map[string]any{
"chatHistory": historyMessages,

View File

@ -319,7 +319,7 @@ func (i *impl) AsyncExecuteNode(ctx context.Context, nodeID string, config vo.Ex
messages, err := i.prefetchChatHistory(ctx, config, historyRounds)
if err != nil {
logs.CtxErrorf(ctx, "failed to prefetch chat history: %v", err)
} else if len(messages) > 0 {
} else if len(messages) > 0 || messages != nil {
config.ConversationHistory = messages
}
}
@ -421,7 +421,7 @@ func (i *impl) StreamExecute(ctx context.Context, config vo.ExecuteConfig, input
messages, err := i.prefetchChatHistory(ctx, config, historyRounds)
if err != nil {
logs.CtxErrorf(ctx, "failed to prefetch chat history: %v", err)
} else if len(messages) > 0 {
} else if len(messages) > 0 || messages != nil {
config.ConversationHistory = messages
}
}
@ -1027,7 +1027,7 @@ func (i *impl) prefetchChatHistory(ctx context.Context, config vo.ExecuteConfig,
ConversationID: *convID,
AppID: resolvedAppID,
UserID: userID,
Rounds: historyRounds,
Rounds: historyRounds + 1,
SectionID: *sectionID,
}