feat(singleagent): chatflow as agent interrupt

This commit is contained in:
lijunwen.gigoo
2025-08-13 15:46:45 +08:00
parent 77ac3001f0
commit ece8bd3a47
3 changed files with 55 additions and 14 deletions

View File

@ -73,6 +73,37 @@ type runtimeDependence struct {
usage *agentrun.Usage usage *agentrun.Usage
} }
func (rd *runtimeDependence) SetRunID(runID int64) {
rd.runID = runID
}
func (rd *runtimeDependence) GetRunID() int64 {
return rd.runID
}
func (rd *runtimeDependence) SetRunMeta(arm *entity.AgentRunMeta) {
rd.runMeta = arm
}
func (rd *runtimeDependence) GetRunMeta() *entity.AgentRunMeta {
return rd.runMeta
}
func (rd *runtimeDependence) SetAgentInfo(agentInfo *singleagent.SingleAgent) {
rd.agentInfo = agentInfo
}
func (rd *runtimeDependence) GetAgentInfo() *singleagent.SingleAgent {
return rd.agentInfo
}
func (rd *runtimeDependence) SetQuestionMsgID(msgID int64) {
rd.questionMsgID = msgID
}
func (rd *runtimeDependence) GetQuestionMsgID() int64 {
return rd.questionMsgID
}
func (rd *runtimeDependence) SetStartTime(t time.Time) {
rd.startTime = t
}
func (rd *runtimeDependence) GetStartTime() time.Time {
return rd.startTime
}
type Components struct { type Components struct {
RunRecordRepo repository.RunRecordRepo RunRecordRepo repository.RunRecordRepo
ImagexSVC imagex.ImageX ImagexSVC imagex.ImageX
@ -116,7 +147,7 @@ func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.Agent
return return
} }
rtDependence.agentInfo = agentInfo rtDependence.SetAgentInfo(agentInfo)
history, err := c.handlerHistory(ctx, rtDependence) history, err := c.handlerHistory(ctx, rtDependence)
if err != nil { if err != nil {
@ -128,7 +159,7 @@ func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.Agent
if err != nil { if err != nil {
return return
} }
rtDependence.runID = runRecord.ID rtDependence.SetRunID(runRecord.ID)
defer func() { defer func() {
srRecord := c.buildSendRunRecord(ctx, runRecord, entity.RunStatusCompleted) srRecord := c.buildSendRunRecord(ctx, runRecord, entity.RunStatusCompleted)
if err != nil { if err != nil {
@ -147,9 +178,9 @@ func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.Agent
return return
} }
rtDependence.questionMsgID = input.ID rtDependence.SetQuestionMsgID(input.ID)
if rtDependence.agentInfo.BotMode == bot_common.BotMode_WorkflowMode { if rtDependence.GetAgentInfo().BotMode == bot_common.BotMode_WorkflowMode {
err = c.handlerWfAsAgentStreamExecute(ctx, sw, history, rtDependence) err = c.handlerWfAsAgentStreamExecute(ctx, sw, history, rtDependence)
} else { } else {
err = c.handlerAgentStreamExecute(ctx, sw, history, input, rtDependence) err = c.handlerAgentStreamExecute(ctx, sw, history, input, rtDependence)
@ -189,7 +220,7 @@ func (c *runImpl) handlerWfAsAgentStreamExecute(ctx context.Context, sw *schema.
if resumeInfo != nil { if resumeInfo != nil {
wfStreamer, err = crossworkflow.DefaultSVC().StreamResume(ctx, &crossworkflow.ResumeRequest{ wfStreamer, err = crossworkflow.DefaultSVC().StreamResume(ctx, &crossworkflow.ResumeRequest{
ResumeData: concatWfInput(rtDependence), ResumeData: concatWfInput(rtDependence),
EventID: 0, EventID: resumeInfo.ChatflowInterrupt.InterruptEvent.ID,
ExecuteID: resumeInfo.ChatflowInterrupt.ExecuteID, ExecuteID: resumeInfo.ChatflowInterrupt.ExecuteID,
}, executeConfig) }, executeConfig)
} else { } else {
@ -520,13 +551,19 @@ func (c *runImpl) pullWfStream(ctx context.Context, events *schema.StreamReader[
st, re := events.Recv() st, re := events.Recv()
if re != nil { if re != nil {
if errors.Is(re, io.EOF) { if errors.Is(re, io.EOF) {
// update usage
finishErr := c.handlerFinalAnswerFinish(ctx, sw, rtDependence)
if finishErr != nil {
logs.CtxErrorf(ctx, "handlerFinalAnswerFinish error: %v", finishErr)
return
}
return return
} }
logs.CtxErrorf(ctx, "pullWfStream Recv error: %v", re) logs.CtxErrorf(ctx, "pullWfStream Recv error: %v", re)
c.handlerErr(ctx, re, sw) c.handlerErr(ctx, re, sw)
return return
} }
if st == nil { if st == nil {
continue continue
} }
@ -536,7 +573,6 @@ func (c *runImpl) pullWfStream(ctx context.Context, events *schema.StreamReader[
OutputTokens: st.StateMessage.Usage.OutputTokens, OutputTokens: st.StateMessage.Usage.OutputTokens,
TotalCount: st.StateMessage.Usage.InputTokens + st.StateMessage.Usage.OutputTokens, TotalCount: st.StateMessage.Usage.InputTokens + st.StateMessage.Usage.OutputTokens,
} }
logs.CtxInfof(ctx, "pullWfStream usage:%v,err:%v", conv.DebugJsonToStr(usage), re)
} }
if st.StateMessage != nil && st.StateMessage.InterruptEvent != nil { // interrupt if st.StateMessage != nil && st.StateMessage.InterruptEvent != nil { // interrupt
@ -625,13 +661,6 @@ func (c *runImpl) handlerWfInterruptMsg(ctx context.Context, sw *schema.StreamWr
if err != nil { if err != nil {
return return
} }
finishErr := c.handlerFinalAnswerFinish(ctx, sw, rtDependence)
if finishErr != nil {
logs.CtxErrorf(ctx, "handlerFinalAnswerFinish error: %v", finishErr)
return
}
} }
func (c *runImpl) handlerWfInterruptEvent(_ context.Context, interruptEventData *crossworkflow.InterruptEvent) (string, message.ContentType, error) { func (c *runImpl) handlerWfInterruptEvent(_ context.Context, interruptEventData *crossworkflow.InterruptEvent) (string, message.ContentType, error) {

View File

@ -145,6 +145,10 @@ func (dao *MessageDAO) Edit(ctx context.Context, msgID int64, msg *message.Messa
if err != nil { if err != nil {
return 0, err return 0, err
} }
if do.RowsAffected == 0 {
return 0, errorx.New(errno.ErrRecordNotFound)
}
return do.RowsAffected, nil return do.RowsAffected, nil
} }

View File

@ -35,9 +35,17 @@ const (
ErrConversationMessageNotFound = 103200001 ErrConversationMessageNotFound = 103200001
ErrAgentRun = 103200002 ErrAgentRun = 103200002
ErrRecordNotFound = 103200003
) )
func init() { func init() {
code.Register(
ErrRecordNotFound,
"record not found or nothing to update",
code.WithAffectStability(false),
)
code.Register( code.Register(
ErrAgentRun, ErrAgentRun,
"Interal Server Error", "Interal Server Error",