Merge branch 'feat/mcp-06-18' into deploy/dev

This commit is contained in:
Novice
2025-10-20 10:58:27 +08:00
463 changed files with 10928 additions and 7216 deletions

View File

@ -22,7 +22,7 @@ from core.memory.errors import MemorySyncTimeoutError
from core.model_runtime.entities.message_entities import PromptMessage
from core.variables.segments import VersionedMemoryValue
from core.workflow.constants import MEMORY_BLOCK_VARIABLE_NODE_ID
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.runtime.variable_pool import VariablePool
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models import App, CreatorUserRole
@ -38,9 +38,7 @@ logger = logging.getLogger(__name__)
class ChatflowMemoryService:
@staticmethod
def get_persistent_memories(
app: App,
created_by: MemoryCreatedBy,
version: int | None = None
app: App, created_by: MemoryCreatedBy, version: int | None = None
) -> Sequence[MemoryBlock]:
if created_by.account_id:
created_by_role = CreatorUserRole.ACCOUNT
@ -50,15 +48,20 @@ class ChatflowMemoryService:
created_by_id = created_by.id
if version is None:
# If version not specified, get the latest version
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == None,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id,
stmt = (
select(ChatflowMemoryVariable)
.distinct(ChatflowMemoryVariable.memory_id)
.where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == None,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id,
)
)
).order_by(ChatflowMemoryVariable.version.desc())
.order_by(ChatflowMemoryVariable.version.desc())
)
else:
stmt = select(ChatflowMemoryVariable).where(
and_(
@ -67,7 +70,7 @@ class ChatflowMemoryService:
ChatflowMemoryVariable.conversation_id == None,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id,
ChatflowMemoryVariable.version == version
ChatflowMemoryVariable.version == version,
)
)
with Session(db.engine) as session:
@ -76,27 +79,29 @@ class ChatflowMemoryService:
@staticmethod
def get_session_memories(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None
) -> Sequence[MemoryBlock]:
if version is None:
# If version not specified, get the latest version
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == conversation_id
stmt = (
select(ChatflowMemoryVariable)
.distinct(ChatflowMemoryVariable.memory_id)
.where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == conversation_id,
)
)
).order_by(ChatflowMemoryVariable.version.desc())
.order_by(ChatflowMemoryVariable.version.desc())
)
else:
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == conversation_id,
ChatflowMemoryVariable.version == version
ChatflowMemoryVariable.version == version,
)
)
with Session(db.engine) as session:
@ -123,10 +128,7 @@ class ChatflowMemoryService:
node_id=memory.node_id,
conversation_id=memory.conversation_id,
name=memory.spec.name,
value=MemoryValueData(
value=memory.value,
edited_by_user=memory.edited_by_user
).model_dump_json(),
value=MemoryValueData(value=memory.value, edited_by_user=memory.edited_by_user).model_dump_json(),
term=memory.spec.term,
scope=memory.spec.scope,
version=memory.version, # Use version from MemoryBlock directly
@ -141,21 +143,22 @@ class ChatflowMemoryService:
draft_var_service = WorkflowDraftVariableService(session)
memory_selector = memory.spec.id if not memory.node_id else f"{memory.node_id}.{memory.spec.id}"
existing_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=memory.app_id,
selectors=[[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_selector]]
app_id=memory.app_id, selectors=[[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_selector]]
)
if existing_vars:
draft_var = existing_vars[0]
draft_var.value = VersionedMemoryValue.model_validate_json(draft_var.value)\
.add_version(memory.value)\
draft_var.value = (
VersionedMemoryValue.model_validate_json(draft_var.value)
.add_version(memory.value)
.model_dump_json()
)
else:
draft_var = WorkflowDraftVariable.new_memory_block_variable(
app_id=memory.app_id,
memory_id=memory.spec.id,
name=memory.spec.name,
value=VersionedMemoryValue().add_version(memory.value),
description=memory.spec.description
description=memory.spec.description,
)
session.add(draft_var)
session.commit()
@ -163,15 +166,19 @@ class ChatflowMemoryService:
@staticmethod
def get_memories_by_specs(
memory_block_specs: Sequence[MemoryBlockSpec],
tenant_id: str, app_id: str,
tenant_id: str,
app_id: str,
created_by: MemoryCreatedBy,
conversation_id: Optional[str],
node_id: Optional[str],
is_draft: bool
is_draft: bool,
) -> Sequence[MemoryBlock]:
return [ChatflowMemoryService.get_memory_by_spec(
spec, tenant_id, app_id, created_by, conversation_id, node_id, is_draft
) for spec in memory_block_specs]
return [
ChatflowMemoryService.get_memory_by_spec(
spec, tenant_id, app_id, created_by, conversation_id, node_id, is_draft
)
for spec in memory_block_specs
]
@staticmethod
def get_memory_by_spec(
@ -181,17 +188,17 @@ class ChatflowMemoryService:
created_by: MemoryCreatedBy,
conversation_id: Optional[str],
node_id: Optional[str],
is_draft: bool
is_draft: bool,
) -> MemoryBlock:
with Session(db.engine) as session:
if is_draft:
draft_var_service = WorkflowDraftVariableService(session)
selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \
if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id]
draft_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=app_id,
selectors=[selector]
selector = (
[MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"]
if node_id
else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id]
)
draft_vars = draft_var_service.get_draft_variables_by_selectors(app_id=app_id, selectors=[selector])
if draft_vars:
draft_var = draft_vars[0]
return MemoryBlock(
@ -204,17 +211,21 @@ class ChatflowMemoryService:
created_by=created_by,
version=1,
)
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.memory_id == spec.id,
ChatflowMemoryVariable.tenant_id == tenant_id,
ChatflowMemoryVariable.app_id == app_id,
ChatflowMemoryVariable.node_id ==
(node_id if spec.scope == MemoryScope.NODE else None),
ChatflowMemoryVariable.conversation_id ==
(conversation_id if spec.term == MemoryTerm.SESSION else None),
stmt = (
select(ChatflowMemoryVariable)
.where(
and_(
ChatflowMemoryVariable.memory_id == spec.id,
ChatflowMemoryVariable.tenant_id == tenant_id,
ChatflowMemoryVariable.app_id == app_id,
ChatflowMemoryVariable.node_id == (node_id if spec.scope == MemoryScope.NODE else None),
ChatflowMemoryVariable.conversation_id
== (conversation_id if spec.term == MemoryTerm.SESSION else None),
)
)
).order_by(ChatflowMemoryVariable.version.desc()).limit(1)
.order_by(ChatflowMemoryVariable.version.desc())
.limit(1)
)
result = session.execute(stmt).scalar()
if result:
memory_value_data = MemoryValueData.model_validate_json(result.value)
@ -246,7 +257,7 @@ class ChatflowMemoryService:
conversation_id: str,
variable_pool: VariablePool,
created_by: MemoryCreatedBy,
is_draft: bool
is_draft: bool,
):
visible_messages = ChatflowHistoryService.get_visible_chat_history(
conversation_id=conversation_id,
@ -294,7 +305,7 @@ class ChatflowMemoryService:
conversation_id=conversation_id,
app_id=workflow.app_id,
visible_messages=visible_messages,
variable_pool=variable_pool
variable_pool=variable_pool,
)
@staticmethod
@ -306,7 +317,7 @@ class ChatflowMemoryService:
conversation_id: str,
memory_block_spec: MemoryBlockSpec,
variable_pool: VariablePool,
is_draft: bool
is_draft: bool,
) -> bool:
visible_messages = ChatflowHistoryService.get_visible_chat_history(
conversation_id=conversation_id,
@ -323,10 +334,7 @@ class ChatflowMemoryService:
is_draft=is_draft,
created_by=created_by,
)
if not ChatflowMemoryService._should_update_memory(
memory_block=memory_block,
visible_history=visible_messages
):
if not ChatflowMemoryService._should_update_memory(memory_block=memory_block, visible_history=visible_messages):
return False
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
@ -336,7 +344,7 @@ class ChatflowMemoryService:
memory_block=memory_block,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
conversation_id=conversation_id,
)
else:
# Node-level async: execute asynchronously
@ -345,7 +353,7 @@ class ChatflowMemoryService:
visible_messages=visible_messages,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
conversation_id=conversation_id,
)
return True
@ -355,7 +363,8 @@ class ChatflowMemoryService:
memory_blocks = workflow.memory_blocks
sync_memory_blocks = [
block for block in memory_blocks
block
for block in memory_blocks
if block.scope == MemoryScope.APP and block.schedule_mode == MemoryScheduleMode.SYNC
]
@ -378,16 +387,11 @@ class ChatflowMemoryService:
time.sleep(retry_interval)
else:
# Maximum retry attempts reached, raise exception
raise MemorySyncTimeoutError(
app_id=workflow.app_id,
conversation_id=conversation_id
)
raise MemorySyncTimeoutError(app_id=workflow.app_id, conversation_id=conversation_id)
@staticmethod
def _convert_to_memory_blocks(
app: App,
created_by: MemoryCreatedBy,
raw_results: Sequence[ChatflowMemoryVariable]
app: App, created_by: MemoryCreatedBy, raw_results: Sequence[ChatflowMemoryVariable]
) -> Sequence[MemoryBlock]:
workflow = WorkflowService().get_published_workflow(app)
if not workflow:
@ -395,8 +399,7 @@ class ChatflowMemoryService:
results = []
for chatflow_memory_variable in raw_results:
spec = next(
(spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id),
None
(spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id), None
)
if spec and chatflow_memory_variable.app_id:
memory_value_data = MemoryValueData.model_validate_json(chatflow_memory_variable.value)
@ -416,10 +419,7 @@ class ChatflowMemoryService:
return results
@staticmethod
def _should_update_memory(
memory_block: MemoryBlock,
visible_history: Sequence[PromptMessage]
) -> bool:
def _should_update_memory(memory_block: MemoryBlock, visible_history: Sequence[PromptMessage]) -> bool:
return len(visible_history) >= memory_block.spec.update_turns
@staticmethod
@ -428,16 +428,16 @@ class ChatflowMemoryService:
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool
is_draft: bool,
):
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id
"memory_block": block,
"visible_messages": visible_messages,
"variable_pool": variable_pool,
"is_draft": is_draft,
"conversation_id": conversation_id,
},
)
thread.start()
@ -449,18 +449,18 @@ class ChatflowMemoryService:
conversation_id: str,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
is_draft: bool
is_draft: bool,
):
"""Submit sync memory batch update task"""
thread = threading.Thread(
target=ChatflowMemoryService._batch_update_sync_memory,
kwargs={
'sync_blocks': sync_blocks,
'app_id': app_id,
'conversation_id': conversation_id,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft
"sync_blocks": sync_blocks,
"app_id": app_id,
"conversation_id": conversation_id,
"visible_messages": visible_messages,
"variable_pool": variable_pool,
"is_draft": is_draft,
},
)
thread.start()
@ -472,7 +472,7 @@ class ChatflowMemoryService:
conversation_id: str,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
is_draft: bool
is_draft: bool,
):
try:
lock_key = _get_memory_sync_lock_key(app_id, conversation_id)
@ -482,11 +482,11 @@ class ChatflowMemoryService:
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id,
"memory_block": block,
"visible_messages": visible_messages,
"variable_pool": variable_pool,
"is_draft": is_draft,
"conversation_id": conversation_id,
},
)
threads.append(thread)
@ -503,14 +503,14 @@ class ChatflowMemoryService:
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool
is_draft: bool,
):
ChatflowMemoryService._perform_memory_update(
memory_block=memory_block,
visible_messages=visible_messages,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
conversation_id=conversation_id,
)
@staticmethod
@ -519,18 +519,18 @@ class ChatflowMemoryService:
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool = False
is_draft: bool = False,
):
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': memory_block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id,
"memory_block": memory_block,
"visible_messages": visible_messages,
"variable_pool": variable_pool,
"is_draft": is_draft,
"conversation_id": conversation_id,
},
daemon=True
daemon=True,
)
thread.start()
@ -540,7 +540,7 @@ class ChatflowMemoryService:
variable_pool: VariablePool,
conversation_id: str,
visible_messages: Sequence[PromptMessage],
is_draft: bool
is_draft: bool,
):
updated_value = LLMGenerator.update_memory_block(
tenant_id=memory_block.tenant_id,
@ -567,7 +567,7 @@ class ChatflowMemoryService:
node_id=memory_block.node_id,
new_visible_count=memory_block.spec.preserved_turns,
app_id=memory_block.app_id,
tenant_id=memory_block.tenant_id
tenant_id=memory_block.tenant_id,
)
@staticmethod
@ -594,7 +594,7 @@ class ChatflowMemoryService:
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.memory_id == memory_id,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id
ChatflowMemoryVariable.created_by == created_by_id,
)
)
session.execute(stmt)
@ -615,7 +615,7 @@ class ChatflowMemoryService:
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id
ChatflowMemoryVariable.created_by == created_by_id,
)
)
session.execute(stmt)
@ -623,38 +623,28 @@ class ChatflowMemoryService:
@staticmethod
def get_persistent_memories_with_conversation(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None
) -> Sequence[MemoryBlockWithConversation]:
"""Get persistent memories with conversation metadata (always None for persistent)"""
memory_blocks = ChatflowMemoryService.get_persistent_memories(app, created_by, version)
return [
MemoryBlockWithConversation.from_memory_block(
block,
ChatflowHistoryService.get_conversation_metadata(
app.tenant_id, app.id, conversation_id, block.node_id
)
ChatflowHistoryService.get_conversation_metadata(app.tenant_id, app.id, conversation_id, block.node_id),
)
for block in memory_blocks
]
@staticmethod
def get_session_memories_with_conversation(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
app: App, created_by: MemoryCreatedBy, conversation_id: str, version: int | None = None
) -> Sequence[MemoryBlockWithConversation]:
"""Get session memories with conversation metadata"""
memory_blocks = ChatflowMemoryService.get_session_memories(app, created_by, conversation_id, version)
return [
MemoryBlockWithConversation.from_memory_block(
block,
ChatflowHistoryService.get_conversation_metadata(
app.tenant_id, app.id, conversation_id, block.node_id
)
ChatflowHistoryService.get_conversation_metadata(app.tenant_id, app.id, conversation_id, block.node_id),
)
for block in memory_blocks
]