Merge branch 'main' into feat/memory-orchestration-be

This commit is contained in:
Stream
2025-08-25 14:54:14 +08:00
566 changed files with 14876 additions and 4377 deletions

View File

@ -6,12 +6,14 @@ implementation details like tenant_id, app_id, etc.
"""
from collections.abc import Mapping
from datetime import UTC, datetime
from datetime import datetime
from enum import StrEnum
from typing import Any, Optional
from pydantic import BaseModel, Field
from libs.datetime_utils import naive_utc_now
class WorkflowType(StrEnum):
"""
@ -60,7 +62,7 @@ class WorkflowExecution(BaseModel):
Calculate elapsed time in seconds.
If workflow is not finished, use current time.
"""
end_time = self.finished_at or datetime.now(UTC).replace(tzinfo=None)
end_time = self.finished_at or naive_utc_now()
return (end_time - self.started_at).total_seconds()
@classmethod

View File

@ -22,7 +22,7 @@ class GraphRuntimeState(BaseModel):
#
# Note: Since the type of this field is `dict[str, Any]`, its values may not remain consistent
# after a serialization and deserialization round trip.
outputs: dict[str, Any] = {}
outputs: dict[str, Any] = Field(default_factory=dict)
node_run_steps: int = 0
"""node run steps"""

View File

@ -1,5 +1,5 @@
import uuid
from datetime import UTC, datetime
from datetime import datetime
from enum import Enum
from typing import Optional
@ -7,6 +7,7 @@ from pydantic import BaseModel, Field
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from libs.datetime_utils import naive_utc_now
class RouteNodeState(BaseModel):
@ -71,7 +72,7 @@ class RouteNodeState(BaseModel):
raise Exception(f"Invalid route status {run_result.status}")
self.node_run_result = run_result
self.finished_at = datetime.now(UTC).replace(tzinfo=None)
self.finished_at = naive_utc_now()
class RuntimeRouteState(BaseModel):
@ -89,7 +90,7 @@ class RuntimeRouteState(BaseModel):
:param node_id: node id
"""
state = RouteNodeState(node_id=node_id, start_at=datetime.now(UTC).replace(tzinfo=None))
state = RouteNodeState(node_id=node_id, start_at=naive_utc_now())
self.node_state_mapping[state.id] = state
return state

View File

@ -6,7 +6,6 @@ import uuid
from collections.abc import Generator, Mapping
from concurrent.futures import ThreadPoolExecutor, wait
from copy import copy, deepcopy
from datetime import UTC, datetime
from typing import Any, Optional, cast
from flask import Flask, current_app
@ -51,6 +50,7 @@ from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.end.end_stream_processor import EndStreamProcessor
from core.workflow.nodes.enums import ErrorStrategy, FailBranchSourceHandle
from core.workflow.nodes.event import RunCompletedEvent, RunRetrieverResourceEvent, RunStreamChunkEvent
from libs.datetime_utils import naive_utc_now
from libs.flask_utils import preserve_flask_contexts
from models.enums import UserFrom
from models.workflow import WorkflowType
@ -640,7 +640,7 @@ class GraphEngine:
while should_continue_retry and retries <= max_retries:
try:
# run node
retry_start_at = datetime.now(UTC).replace(tzinfo=None)
retry_start_at = naive_utc_now()
# yield control to other threads
time.sleep(0.001)
event_stream = node.run()

View File

@ -13,8 +13,9 @@ from core.agent.strategy.plugin import PluginAgentStrategy
from core.file import File, FileTransferMethod
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from core.model_runtime.entities.model_entities import AIModelEntity, ModelType
from core.model_runtime.utils.encoders import jsonable_encoder
from core.plugin.entities.request import InvokeCredentials
from core.plugin.impl.exc import PluginDaemonClientSideError
from core.plugin.impl.plugin import PluginInstaller
@ -558,7 +559,7 @@ class AgentNode(BaseNode):
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
if node_type == NodeType.AGENT:
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
llm_usage = LLMUsage.from_metadata(msg_metadata)
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))
agent_execution_metadata = {
WorkflowNodeExecutionMetadataKey(key): value
for key, value in msg_metadata.items()
@ -692,7 +693,13 @@ class AgentNode(BaseNode):
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"text": text, "files": ArrayFileSegment(value=files), "json": json_output, **variables},
outputs={
"text": text,
"usage": jsonable_encoder(llm_usage),
"files": ArrayFileSegment(value=files),
"json": json_output,
**variables,
},
metadata={
**agent_execution_metadata,
WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,

View File

@ -12,6 +12,7 @@ from json_repair import repair_json
from configs import dify_config
from core.file import file_manager
from core.file.enums import FileTransferMethod
from core.helper import ssrf_proxy
from core.variables.segments import ArrayFileSegment, FileSegment
from core.workflow.entities.variable_pool import VariablePool
@ -228,7 +229,9 @@ class Executor:
files: dict[str, list[tuple[str | None, bytes, str]]] = {}
for key, files_in_segment in files_list:
for file in files_in_segment:
if file.related_id is not None:
if file.related_id is not None or (
file.transfer_method == FileTransferMethod.REMOTE_URL and file.remote_url is not None
):
file_tuple = (
file.filename,
file_manager.download(file),

View File

@ -4,7 +4,7 @@ import time
import uuid
from collections.abc import Generator, Mapping, Sequence
from concurrent.futures import Future, wait
from datetime import UTC, datetime
from datetime import datetime
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Optional, cast
@ -41,6 +41,7 @@ from core.workflow.nodes.enums import ErrorStrategy, NodeType
from core.workflow.nodes.event import NodeEvent, RunCompletedEvent
from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData
from factories.variable_factory import build_segment
from libs.datetime_utils import naive_utc_now
from libs.flask_utils import preserve_flask_contexts
from .exc import (
@ -179,7 +180,7 @@ class IterationNode(BaseNode):
thread_pool_id=self.thread_pool_id,
)
start_at = datetime.now(UTC).replace(tzinfo=None)
start_at = naive_utc_now()
yield IterationRunStartedEvent(
iteration_id=self.id,
@ -428,7 +429,7 @@ class IterationNode(BaseNode):
"""
run single iteration
"""
iter_start_at = datetime.now(UTC).replace(tzinfo=None)
iter_start_at = naive_utc_now()
try:
rst = graph_engine.run()
@ -505,7 +506,7 @@ class IterationNode(BaseNode):
variable_pool.add([self.node_id, "index"], next_index)
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds()
duration = (naive_utc_now() - iter_start_at).total_seconds()
iter_run_map[iteration_run_id] = duration
yield IterationRunNextEvent(
iteration_id=self.id,
@ -526,7 +527,7 @@ class IterationNode(BaseNode):
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds()
duration = (naive_utc_now() - iter_start_at).total_seconds()
iter_run_map[iteration_run_id] = duration
yield IterationRunNextEvent(
iteration_id=self.id,
@ -602,7 +603,7 @@ class IterationNode(BaseNode):
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds()
duration = (naive_utc_now() - iter_start_at).total_seconds()
iter_run_map[iteration_run_id] = duration
yield IterationRunNextEvent(
iteration_id=self.id,

View File

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Optional, cast
from sqlalchemy import Float, and_, func, or_, text
from sqlalchemy import cast as sqlalchemy_cast
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from core.app.app_config.entities import DatasetRetrieveConfigEntity
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
@ -175,7 +175,7 @@ class KnowledgeRetrievalNode(BaseNode):
redis_client.zremrangebyscore(key, 0, current_time - 60000)
request_count = redis_client.zcard(key)
if request_count > knowledge_rate_limit.limit:
with Session(db.engine) as session:
with sessionmaker(db.engine).begin() as session:
# add ratelimit record
rate_limit_log = RateLimitLog(
tenant_id=self.tenant_id,
@ -183,7 +183,6 @@ class KnowledgeRetrievalNode(BaseNode):
operation="knowledge",
)
session.add(rate_limit_log)
session.commit()
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=variables,
@ -389,6 +388,15 @@ class KnowledgeRetrievalNode(BaseNode):
"segment_id": segment.id,
"retriever_from": "workflow",
"score": record.score or 0.0,
"child_chunks": [
{
"id": str(getattr(chunk, "id", "")),
"content": str(getattr(chunk, "content", "")),
"position": int(getattr(chunk, "position", 0)),
"score": float(getattr(chunk, "score", 0.0)),
}
for chunk in (record.child_chunks or [])
],
"segment_hit_count": segment.hit_count,
"segment_word_count": segment.word_count,
"segment_position": segment.position,
@ -572,7 +580,7 @@ class KnowledgeRetrievalNode(BaseNode):
def _process_metadata_filter_func(
self, sequence: int, condition: str, metadata_name: str, value: Optional[Any], filters: list
):
if value is None:
if value is None and condition not in ("empty", "not empty"):
return
key = f"{metadata_name}_{sequence}"

View File

@ -13,7 +13,7 @@ class ModelConfig(BaseModel):
provider: str
name: str
mode: LLMMode
completion_params: dict[str, Any] = {}
completion_params: dict[str, Any] = Field(default_factory=dict)
class ContextConfig(BaseModel):

View File

@ -1,5 +1,4 @@
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Optional, cast
from sqlalchemy import select, update
@ -20,6 +19,7 @@ from core.variables.segments import ArrayAnySegment, ArrayFileSegment, FileSegme
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.nodes.llm.entities import ModelConfig
from libs.datetime_utils import naive_utc_now
from models import db
from models.model import Conversation
from models.provider import Provider, ProviderType
@ -149,7 +149,7 @@ def deduct_llm_quota(tenant_id: str, model_instance: ModelInstance, usage: LLMUs
)
.values(
quota_used=Provider.quota_used + used_quota,
last_used=datetime.now(tz=UTC).replace(tzinfo=None),
last_used=naive_utc_now(),
)
)
session.execute(stmt)

View File

@ -2,7 +2,7 @@ import json
import logging
import time
from collections.abc import Generator, Mapping, Sequence
from datetime import UTC, datetime
from datetime import datetime
from typing import TYPE_CHECKING, Any, Literal, Optional, cast
from configs import dify_config
@ -36,6 +36,7 @@ from core.workflow.nodes.event import NodeEvent, RunCompletedEvent
from core.workflow.nodes.loop.entities import LoopNodeData
from core.workflow.utils.condition.processor import ConditionProcessor
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from libs.datetime_utils import naive_utc_now
if TYPE_CHECKING:
from core.workflow.entities.variable_pool import VariablePool
@ -143,7 +144,7 @@ class LoopNode(BaseNode):
thread_pool_id=self.thread_pool_id,
)
start_at = datetime.now(UTC).replace(tzinfo=None)
start_at = naive_utc_now()
condition_processor = ConditionProcessor()
# Start Loop event
@ -171,7 +172,7 @@ class LoopNode(BaseNode):
try:
check_break_result = False
for i in range(loop_count):
loop_start_time = datetime.now(UTC).replace(tzinfo=None)
loop_start_time = naive_utc_now()
# run single loop
loop_result = yield from self._run_single_loop(
graph_engine=graph_engine,
@ -185,7 +186,7 @@ class LoopNode(BaseNode):
start_at=start_at,
inputs=inputs,
)
loop_end_time = datetime.now(UTC).replace(tzinfo=None)
loop_end_time = naive_utc_now()
single_loop_variable = {}
for key, selector in loop_variable_selectors.items():
@ -313,30 +314,31 @@ class LoopNode(BaseNode):
and event.node_type == NodeType.LOOP_END
and not isinstance(event, NodeRunStreamChunkEvent)
):
check_break_result = True
# Check if variables in break conditions exist and process conditions
# Allow loop internal variables to be used in break conditions
available_conditions = []
for condition in break_conditions:
variable = self.graph_runtime_state.variable_pool.get(condition.variable_selector)
if variable:
available_conditions.append(condition)
# Process conditions if at least one variable is available
if available_conditions:
input_conditions, group_result, check_break_result = condition_processor.process_conditions(
variable_pool=self.graph_runtime_state.variable_pool,
conditions=available_conditions,
operator=logical_operator,
)
if check_break_result:
break
else:
check_break_result = True
yield self._handle_event_metadata(event=event, iter_run_index=current_index)
break
if isinstance(event, NodeRunSucceededEvent):
yield self._handle_event_metadata(event=event, iter_run_index=current_index)
# Check if all variables in break conditions exist
exists_variable = False
for condition in break_conditions:
if not self.graph_runtime_state.variable_pool.get(condition.variable_selector):
exists_variable = False
break
else:
exists_variable = True
if exists_variable:
input_conditions, group_result, check_break_result = condition_processor.process_conditions(
variable_pool=self.graph_runtime_state.variable_pool,
conditions=break_conditions,
operator=logical_operator,
)
if check_break_result:
break
elif isinstance(event, BaseGraphEvent):
if isinstance(event, GraphRunFailedEvent):
# Loop run failed

View File

@ -1,3 +1,4 @@
import contextlib
import json
import logging
import uuid
@ -666,10 +667,8 @@ class ParameterExtractorNode(BaseNode):
if result[idx] == "{" or result[idx] == "[":
json_str = extract_json(result[idx:])
if json_str:
try:
with contextlib.suppress(Exception):
return cast(dict, json.loads(json_str))
except Exception:
pass
logger.info("extra error: %s", result)
return None
@ -686,10 +685,9 @@ class ParameterExtractorNode(BaseNode):
if result[idx] == "{" or result[idx] == "[":
json_str = extract_json(result[idx:])
if json_str:
try:
with contextlib.suppress(Exception):
return cast(dict, json.loads(json_str))
except Exception:
pass
logger.info("extra error: %s", result)
return None