Merge remote-tracking branch 'origin/main' into feat/queue-based-graph-engine

This commit is contained in:
-LAN-
2025-08-29 17:01:42 +08:00
74 changed files with 1595 additions and 145 deletions

View File

@ -32,6 +32,65 @@ class TokenBufferMemory:
self.conversation = conversation
self.model_instance = model_instance
def _build_prompt_message_with_files(
self, message_files: list[MessageFile], text_content: str, message: Message, app_record, is_user_message: bool
) -> PromptMessage:
"""
Build prompt message with files.
:param message_files: list of MessageFile objects
:param text_content: text content of the message
:param message: Message object
:param app_record: app record
:param is_user_message: whether this is a user message
:return: PromptMessage
"""
if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}:
file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config)
elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow_run = db.session.scalar(select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id))
if not workflow_run:
raise ValueError(f"Workflow run not found: {message.workflow_run_id}")
workflow = db.session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id))
if not workflow:
raise ValueError(f"Workflow not found: {workflow_run.workflow_id}")
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
else:
raise AssertionError(f"Invalid app mode: {self.conversation.mode}")
detail = ImagePromptMessageContent.DETAIL.HIGH
if file_extra_config and app_record:
# Build files directly without filtering by belongs_to
file_objs = [
file_factory.build_from_message_file(
message_file=message_file, tenant_id=app_record.tenant_id, config=file_extra_config
)
for message_file in message_files
]
if file_extra_config.image_config and file_extra_config.image_config.detail:
detail = file_extra_config.image_config.detail
else:
file_objs = []
if not file_objs:
if is_user_message:
return UserPromptMessage(content=text_content)
else:
return AssistantPromptMessage(content=text_content)
else:
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in file_objs:
prompt_message = file_manager.to_prompt_message_content(
file,
image_detail_config=detail,
)
prompt_message_contents.append(prompt_message)
prompt_message_contents.append(TextPromptMessageContent(data=text_content))
if is_user_message:
return UserPromptMessage(content=prompt_message_contents)
else:
return AssistantPromptMessage(content=prompt_message_contents)
def get_history_prompt_messages(
self, max_token_limit: int = 2000, message_limit: Optional[int] = None
) -> Sequence[PromptMessage]:
@ -40,82 +99,73 @@ class TokenBufferMemory:
:param max_token_limit: max token limit
:param message_limit: message limit
"""
with Session(db.engine) as session:
app_record = self.conversation.app
app_record = self.conversation.app
# fetch limited messages, and return reversed
stmt = (
select(Message)
.where(Message.conversation_id == self.conversation.id)
.order_by(Message.created_at.desc())
# fetch limited messages, and return reversed
stmt = (
select(Message).where(Message.conversation_id == self.conversation.id).order_by(Message.created_at.desc())
)
if message_limit and message_limit > 0:
message_limit = min(message_limit, 500)
else:
message_limit = 500
stmt = stmt.limit(message_limit)
messages = db.session.scalars(stmt).all()
# instead of all messages from the conversation, we only need to extract messages
# that belong to the thread of last message
thread_messages = extract_thread_messages(messages)
# for newly created message, its answer is temporarily empty, we don't need to add it to memory
if thread_messages and not thread_messages[0].answer and thread_messages[0].answer_tokens == 0:
thread_messages.pop(0)
messages = list(reversed(thread_messages))
prompt_messages: list[PromptMessage] = []
for message in messages:
# Process user message with files
user_files = (
db.session.query(MessageFile)
.where(
MessageFile.message_id == message.id,
(MessageFile.belongs_to == "user") | (MessageFile.belongs_to.is_(None)),
)
.all()
)
if message_limit and message_limit > 0:
message_limit = min(message_limit, 500)
if user_files:
user_prompt_message = self._build_prompt_message_with_files(
message_files=user_files,
text_content=message.query,
message=message,
app_record=app_record,
is_user_message=True,
)
prompt_messages.append(user_prompt_message)
else:
message_limit = 500
prompt_messages.append(UserPromptMessage(content=message.query))
stmt = stmt.limit(message_limit)
messages = session.scalars(stmt).all()
# instead of all messages from the conversation, we only need to extract messages
# that belong to the thread of last message
thread_messages = extract_thread_messages(messages)
# for newly created message, its answer is temporarily empty, we don't need to add it to memory
if thread_messages and not thread_messages[0].answer and thread_messages[0].answer_tokens == 0:
thread_messages.pop(0)
messages = list(reversed(thread_messages))
prompt_messages: list[PromptMessage] = []
for message in messages:
files = session.query(MessageFile).where(MessageFile.message_id == message.id).all()
if files:
file_extra_config = None
if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}:
file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config)
elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow_run = session.scalar(
select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id)
)
if not workflow_run:
raise ValueError(f"Workflow run not found: {message.workflow_run_id}")
workflow = session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id))
if not workflow:
raise ValueError(f"Workflow not found: {workflow_run.workflow_id}")
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
else:
raise AssertionError(f"Invalid app mode: {self.conversation.mode}")
detail = ImagePromptMessageContent.DETAIL.LOW
if file_extra_config and app_record:
file_objs = file_factory.build_from_message_files(
message_files=files, tenant_id=app_record.tenant_id, config=file_extra_config
)
if file_extra_config.image_config and file_extra_config.image_config.detail:
detail = file_extra_config.image_config.detail
else:
file_objs = []
if not file_objs:
prompt_messages.append(UserPromptMessage(content=message.query))
else:
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in file_objs:
prompt_message = file_manager.to_prompt_message_content(
file,
image_detail_config=detail,
)
prompt_message_contents.append(prompt_message)
prompt_message_contents.append(TextPromptMessageContent(data=message.query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
prompt_messages.append(UserPromptMessage(content=message.query))
# Process assistant message with files
assistant_files = (
db.session.query(MessageFile)
.where(MessageFile.message_id == message.id, MessageFile.belongs_to == "assistant")
.all()
)
if assistant_files:
assistant_prompt_message = self._build_prompt_message_with_files(
message_files=assistant_files,
text_content=message.answer,
message=message,
app_record=app_record,
is_user_message=False,
)
prompt_messages.append(assistant_prompt_message)
else:
prompt_messages.append(AssistantPromptMessage(content=message.answer))
if not prompt_messages: