Merge branch 'main' into feat/queue-based-graph-engine

This commit is contained in:
-LAN-
2025-09-16 12:59:26 +08:00
43 changed files with 226 additions and 344 deletions

View File

@ -352,8 +352,8 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_FRAMEWORK: "dify",
TOOL_NAME: node_execution.title,
TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
INPUT_VALUE: json.dumps(node_execution.inputs if node_execution.inputs else {}, ensure_ascii=False),
TOOL_PARAMETERS: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
INPUT_VALUE: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
},
status=self.get_workflow_node_status(node_execution),

View File

@ -144,13 +144,13 @@ class LangFuseDataTrace(BaseTraceInstance):
if node_type == NodeType.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs if node_execution.inputs else {}
outputs = node_execution.outputs if node_execution.outputs else {}
inputs = node_execution.inputs or {}
outputs = node_execution.outputs or {}
created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = node_execution.metadata if node_execution.metadata else {}
execution_metadata = node_execution.metadata or {}
metadata = {str(k): v for k, v in execution_metadata.items()}
metadata.update(
{
@ -163,7 +163,7 @@ class LangFuseDataTrace(BaseTraceInstance):
"status": status,
}
)
process_data = node_execution.process_data if node_execution.process_data else {}
process_data = node_execution.process_data or {}
model_provider = process_data.get("model_provider", None)
model_name = process_data.get("model_name", None)
if model_provider is not None and model_name is not None:

View File

@ -166,13 +166,13 @@ class LangSmithDataTrace(BaseTraceInstance):
if node_type == NodeType.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs if node_execution.inputs else {}
outputs = node_execution.outputs if node_execution.outputs else {}
inputs = node_execution.inputs or {}
outputs = node_execution.outputs or {}
created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = node_execution.metadata if node_execution.metadata else {}
execution_metadata = node_execution.metadata or {}
node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0
metadata = {str(key): value for key, value in execution_metadata.items()}
metadata.update(
@ -187,7 +187,7 @@ class LangSmithDataTrace(BaseTraceInstance):
}
)
process_data = node_execution.process_data if node_execution.process_data else {}
process_data = node_execution.process_data or {}
if process_data and process_data.get("model_mode") == "chat":
run_type = LangSmithRunType.llm

View File

@ -181,13 +181,13 @@ class OpikDataTrace(BaseTraceInstance):
if node_type == NodeType.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs if node_execution.inputs else {}
outputs = node_execution.outputs if node_execution.outputs else {}
inputs = node_execution.inputs or {}
outputs = node_execution.outputs or {}
created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = node_execution.metadata if node_execution.metadata else {}
execution_metadata = node_execution.metadata or {}
metadata = {str(k): v for k, v in execution_metadata.items()}
metadata.update(
{
@ -201,7 +201,7 @@ class OpikDataTrace(BaseTraceInstance):
}
)
process_data = node_execution.process_data if node_execution.process_data else {}
process_data = node_execution.process_data or {}
provider = None
model = None

View File

@ -1,3 +1,4 @@
import collections
import json
import logging
import os
@ -42,7 +43,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class OpsTraceProviderConfigMap(dict[str, dict[str, Any]]):
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
def __getitem__(self, provider: str) -> dict[str, Any]:
match provider:
case TracingProviderEnum.LANGFUSE:
@ -123,7 +124,7 @@ class OpsTraceProviderConfigMap(dict[str, dict[str, Any]]):
raise KeyError(f"Unsupported tracing provider: {provider}")
provider_config_map: dict[str, dict[str, Any]] = OpsTraceProviderConfigMap()
provider_config_map = OpsTraceProviderConfigMap()
class OpsTraceManager:

View File

@ -168,13 +168,13 @@ class WeaveDataTrace(BaseTraceInstance):
if node_type == NodeType.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else:
inputs = node_execution.inputs if node_execution.inputs else {}
outputs = node_execution.outputs if node_execution.outputs else {}
inputs = node_execution.inputs or {}
outputs = node_execution.outputs or {}
created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time)
execution_metadata = node_execution.metadata if node_execution.metadata else {}
execution_metadata = node_execution.metadata or {}
node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0
attributes = {str(k): v for k, v in execution_metadata.items()}
attributes.update(
@ -189,7 +189,7 @@ class WeaveDataTrace(BaseTraceInstance):
}
)
process_data = node_execution.process_data if node_execution.process_data else {}
process_data = node_execution.process_data or {}
if process_data and process_data.get("model_mode") == "chat":
attributes.update(
{

View File

@ -641,7 +641,7 @@ class ClickzettaVector(BaseVector):
for doc, embedding in zip(batch_docs, batch_embeddings):
# Optimized: minimal checks for common case, fallback for edge cases
metadata = doc.metadata if doc.metadata else {}
metadata = doc.metadata or {}
if not isinstance(metadata, dict):
metadata = {}

View File

@ -103,7 +103,7 @@ class MatrixoneVector(BaseVector):
self.client = self._get_client(len(embeddings[0]), True)
assert self.client is not None
ids = []
for _, doc in enumerate(documents):
for doc in documents:
if doc.metadata is not None:
doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))
ids.append(doc_id)

View File

@ -104,7 +104,7 @@ class OpenSearchVector(BaseVector):
},
}
# See https://github.com/langchain-ai/langchainjs/issues/4346#issuecomment-1935123377
if self._client_config.aws_service not in ["aoss"]:
if self._client_config.aws_service != "aoss":
action["_id"] = uuid4().hex
actions.append(action)

View File

@ -156,7 +156,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
else None
)
db_model.status = domain_model.status
db_model.error = domain_model.error_message if domain_model.error_message else None
db_model.error = domain_model.error_message or None
db_model.total_tokens = domain_model.total_tokens
db_model.total_steps = domain_model.total_steps
db_model.exceptions_count = domain_model.exceptions_count

View File

@ -330,7 +330,7 @@ class AgentNode(Node):
memory = self._fetch_memory(model_instance)
if memory:
prompt_messages = memory.get_history_prompt_messages(
message_limit=node_data.memory.window.size if node_data.memory.window.size else None
message_limit=node_data.memory.window.size or None
)
history_prompt_messages = [
prompt_message.model_dump(mode="json") for prompt_message in prompt_messages