mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 07:58:02 +08:00
Compare commits
7 Commits
copilot/fi
...
deploy/rag
| Author | SHA1 | Date | |
|---|---|---|---|
| 6bf55e1cba | |||
| d0e9fccc9d | |||
| 74d938a8d2 | |||
| 4cebaa331e | |||
| b55c354139 | |||
| 500836ba25 | |||
| 4174462190 |
@ -408,9 +408,6 @@ SSRF_DEFAULT_TIME_OUT=5
|
||||
SSRF_DEFAULT_CONNECT_TIME_OUT=5
|
||||
SSRF_DEFAULT_READ_TIME_OUT=5
|
||||
SSRF_DEFAULT_WRITE_TIME_OUT=5
|
||||
SSRF_POOL_MAX_CONNECTIONS=100
|
||||
SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS=20
|
||||
SSRF_POOL_KEEPALIVE_EXPIRY=5.0
|
||||
|
||||
BATCH_UPLOAD_LIMIT=10
|
||||
KEYWORD_DATA_SOURCE_TYPE=database
|
||||
@ -421,10 +418,6 @@ WORKFLOW_FILE_UPLOAD_LIMIT=10
|
||||
# CODE EXECUTION CONFIGURATION
|
||||
CODE_EXECUTION_ENDPOINT=http://127.0.0.1:8194
|
||||
CODE_EXECUTION_API_KEY=dify-sandbox
|
||||
CODE_EXECUTION_SSL_VERIFY=True
|
||||
CODE_EXECUTION_POOL_MAX_CONNECTIONS=100
|
||||
CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS=20
|
||||
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY=5.0
|
||||
CODE_MAX_NUMBER=9223372036854775807
|
||||
CODE_MIN_NUMBER=-9223372036854775808
|
||||
CODE_MAX_STRING_LENGTH=80000
|
||||
@ -468,6 +461,7 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
|
||||
WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
|
||||
# GraphEngine Worker Pool Configuration
|
||||
|
||||
@ -18,3 +18,18 @@ class EnterpriseFeatureConfig(BaseSettings):
|
||||
description="Allow customization of the enterprise logo.",
|
||||
default=False,
|
||||
)
|
||||
|
||||
UPLOAD_KNOWLEDGE_PIPELINE_TEMPLATE_TOKEN: str = Field(
|
||||
description="Token for uploading knowledge pipeline template.",
|
||||
default="",
|
||||
)
|
||||
|
||||
KNOWLEDGE_PIPELINE_TEMPLATE_COPYRIGHT: str = Field(
|
||||
description="Knowledge pipeline template copyright.",
|
||||
default="Copyright 2023 Dify",
|
||||
)
|
||||
|
||||
KNOWLEDGE_PIPELINE_TEMPLATE_PRIVACY_POLICY: str = Field(
|
||||
description="Knowledge pipeline template privacy policy.",
|
||||
default="https://dify.ai",
|
||||
)
|
||||
|
||||
@ -113,21 +113,6 @@ class CodeExecutionSandboxConfig(BaseSettings):
|
||||
default=10.0,
|
||||
)
|
||||
|
||||
CODE_EXECUTION_POOL_MAX_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of concurrent connections for the code execution HTTP client",
|
||||
default=100,
|
||||
)
|
||||
|
||||
CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of persistent keep-alive connections for the code execution HTTP client",
|
||||
default=20,
|
||||
)
|
||||
|
||||
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY: PositiveFloat | None = Field(
|
||||
description="Keep-alive expiry in seconds for idle connections (set to None to disable)",
|
||||
default=5.0,
|
||||
)
|
||||
|
||||
CODE_MAX_NUMBER: PositiveInt = Field(
|
||||
description="Maximum allowed numeric value in code execution",
|
||||
default=9223372036854775807,
|
||||
@ -168,11 +153,6 @@ class CodeExecutionSandboxConfig(BaseSettings):
|
||||
default=1000,
|
||||
)
|
||||
|
||||
CODE_EXECUTION_SSL_VERIFY: bool = Field(
|
||||
description="Enable or disable SSL verification for code execution requests",
|
||||
default=True,
|
||||
)
|
||||
|
||||
|
||||
class PluginConfig(BaseSettings):
|
||||
"""
|
||||
@ -424,21 +404,6 @@ class HttpConfig(BaseSettings):
|
||||
default=5,
|
||||
)
|
||||
|
||||
SSRF_POOL_MAX_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of concurrent connections for the SSRF HTTP client",
|
||||
default=100,
|
||||
)
|
||||
|
||||
SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of persistent keep-alive connections for the SSRF HTTP client",
|
||||
default=20,
|
||||
)
|
||||
|
||||
SSRF_POOL_KEEPALIVE_EXPIRY: PositiveFloat | None = Field(
|
||||
description="Keep-alive expiry in seconds for idle SSRF connections (set to None to disable)",
|
||||
default=5.0,
|
||||
)
|
||||
|
||||
RESPECT_XFORWARD_HEADERS_ENABLED: bool = Field(
|
||||
description="Enable handling of X-Forwarded-For, X-Forwarded-Proto, and X-Forwarded-Port headers"
|
||||
" when the app is behind a single trusted reverse proxy.",
|
||||
@ -577,6 +542,11 @@ class WorkflowConfig(BaseSettings):
|
||||
default=5,
|
||||
)
|
||||
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field(
|
||||
description="Maximum allowed depth for nested parallel executions",
|
||||
default=3,
|
||||
)
|
||||
|
||||
MAX_VARIABLE_SIZE: PositiveInt = Field(
|
||||
description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.",
|
||||
default=200 * 1024,
|
||||
|
||||
@ -9,6 +9,7 @@ from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
from controllers.console import api, console_ns
|
||||
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
@ -796,6 +797,24 @@ class ConvertToWorkflowApi(Resource):
|
||||
}
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/config")
|
||||
class WorkflowConfigApi(Resource):
|
||||
"""Resource for workflow configuration."""
|
||||
|
||||
@api.doc("get_workflow_config")
|
||||
@api.doc(description="Get workflow configuration")
|
||||
@api.doc(params={"app_id": "Application ID"})
|
||||
@api.response(200, "Workflow configuration retrieved successfully")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App):
|
||||
return {
|
||||
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
|
||||
}
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows")
|
||||
class PublishedAllWorkflowApi(Resource):
|
||||
@api.doc("get_all_published_workflows")
|
||||
|
||||
@ -14,7 +14,10 @@ from controllers.console.wraps import (
|
||||
from extensions.ext_database import db
|
||||
from libs.login import login_required
|
||||
from models.dataset import PipelineCustomizedTemplate
|
||||
from services.entities.knowledge_entities.rag_pipeline_entities import PipelineTemplateInfoEntity
|
||||
from services.entities.knowledge_entities.rag_pipeline_entities import (
|
||||
PipelineBuiltInTemplateEntity,
|
||||
PipelineTemplateInfoEntity,
|
||||
)
|
||||
from services.rag_pipeline.rag_pipeline import RagPipelineService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -26,12 +29,6 @@ def _validate_name(name):
|
||||
return name
|
||||
|
||||
|
||||
def _validate_description_length(description):
|
||||
if len(description) > 400:
|
||||
raise ValueError("Description cannot exceed 400 characters.")
|
||||
return description
|
||||
|
||||
|
||||
class PipelineTemplateListApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@ -146,6 +143,186 @@ class PublishCustomizedPipelineTemplateApi(Resource):
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
class PipelineTemplateInstallApi(Resource):
|
||||
"""API endpoint for installing built-in pipeline templates"""
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
Install a built-in pipeline template
|
||||
|
||||
Args:
|
||||
template_id: The template ID from URL parameter
|
||||
|
||||
Returns:
|
||||
Success response or error with appropriate HTTP status
|
||||
"""
|
||||
try:
|
||||
# Extract and validate Bearer token
|
||||
auth_token = self._extract_bearer_token()
|
||||
|
||||
# Parse and validate request parameters
|
||||
template_args = self._parse_template_args()
|
||||
|
||||
# Process uploaded template file
|
||||
file_content = self._process_template_file()
|
||||
|
||||
# Create template entity
|
||||
pipeline_built_in_template_entity = PipelineBuiltInTemplateEntity(**template_args)
|
||||
|
||||
# Install the template
|
||||
rag_pipeline_service = RagPipelineService()
|
||||
rag_pipeline_service.install_built_in_pipeline_template(
|
||||
pipeline_built_in_template_entity, file_content, auth_token
|
||||
)
|
||||
|
||||
return {"result": "success", "message": "Template installed successfully"}, 200
|
||||
|
||||
except ValueError as e:
|
||||
logger.exception("Validation error in template installation")
|
||||
return {"error": str(e)}, 400
|
||||
except Exception as e:
|
||||
logger.exception("Unexpected error in template installation")
|
||||
return {"error": "An unexpected error occurred during template installation"}, 500
|
||||
|
||||
def _extract_bearer_token(self) -> str:
|
||||
"""
|
||||
Extract and validate Bearer token from Authorization header
|
||||
|
||||
Returns:
|
||||
The extracted token string
|
||||
|
||||
Raises:
|
||||
ValueError: If token is missing or invalid
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization", "").strip()
|
||||
|
||||
if not auth_header:
|
||||
raise ValueError("Authorization header is required")
|
||||
|
||||
if not auth_header.startswith("Bearer "):
|
||||
raise ValueError("Authorization header must start with 'Bearer '")
|
||||
|
||||
token_parts = auth_header.split(" ", 1)
|
||||
if len(token_parts) != 2:
|
||||
raise ValueError("Invalid Authorization header format")
|
||||
|
||||
auth_token = token_parts[1].strip()
|
||||
if not auth_token:
|
||||
raise ValueError("Bearer token cannot be empty")
|
||||
|
||||
return auth_token
|
||||
|
||||
def _parse_template_args(self) -> dict:
|
||||
"""
|
||||
Parse and validate template arguments from form data
|
||||
|
||||
Args:
|
||||
template_id: The template ID from URL
|
||||
|
||||
Returns:
|
||||
Dictionary of validated template arguments
|
||||
"""
|
||||
# Use reqparse for consistent parameter parsing
|
||||
parser = reqparse.RequestParser()
|
||||
|
||||
parser.add_argument(
|
||||
"template_id",
|
||||
type=str,
|
||||
location="form",
|
||||
required=False,
|
||||
help="Template ID for updating existing template"
|
||||
)
|
||||
parser.add_argument(
|
||||
"language",
|
||||
type=str,
|
||||
location="form",
|
||||
required=True,
|
||||
default="en-US",
|
||||
choices=["en-US", "zh-CN", "ja-JP"],
|
||||
help="Template language code"
|
||||
)
|
||||
parser.add_argument(
|
||||
"name",
|
||||
type=str,
|
||||
location="form",
|
||||
required=True,
|
||||
default="New Pipeline Template",
|
||||
help="Template name (1-200 characters)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"description",
|
||||
type=str,
|
||||
location="form",
|
||||
required=False,
|
||||
default="",
|
||||
help="Template description (max 1000 characters)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Additional validation
|
||||
if args.get("name"):
|
||||
args["name"] = self._validate_name(args["name"])
|
||||
|
||||
if args.get("description") and len(args["description"]) > 1000:
|
||||
raise ValueError("Description must not exceed 1000 characters")
|
||||
|
||||
# Filter out None values
|
||||
return {k: v for k, v in args.items() if v is not None}
|
||||
|
||||
def _validate_name(self, name: str) -> str:
|
||||
"""
|
||||
Validate template name
|
||||
|
||||
Args:
|
||||
name: Template name to validate
|
||||
|
||||
Returns:
|
||||
Validated and trimmed name
|
||||
|
||||
Raises:
|
||||
ValueError: If name is invalid
|
||||
"""
|
||||
name = name.strip()
|
||||
if not name or len(name) < 1 or len(name) > 200:
|
||||
raise ValueError("Template name must be between 1 and 200 characters")
|
||||
return name
|
||||
|
||||
def _process_template_file(self) -> str:
|
||||
"""
|
||||
Process and validate uploaded template file
|
||||
|
||||
Returns:
|
||||
File content as string
|
||||
|
||||
Raises:
|
||||
ValueError: If file is missing or invalid
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
raise ValueError("Template file is required")
|
||||
|
||||
file = request.files["file"]
|
||||
|
||||
# Validate file
|
||||
if not file or not file.filename:
|
||||
raise ValueError("No file selected")
|
||||
|
||||
filename = file.filename.strip()
|
||||
if not filename:
|
||||
raise ValueError("File name cannot be empty")
|
||||
|
||||
# Check file extension
|
||||
if not filename.lower().endswith(".pipeline"):
|
||||
raise ValueError("Template file must be a pipeline file (.pipeline)")
|
||||
|
||||
try:
|
||||
file_content = file.read().decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
raise ValueError("Template file must be valid UTF-8 text")
|
||||
|
||||
return file_content
|
||||
|
||||
|
||||
api.add_resource(
|
||||
PipelineTemplateListApi,
|
||||
"/rag/pipeline/templates",
|
||||
@ -162,3 +339,7 @@ api.add_resource(
|
||||
PublishCustomizedPipelineTemplateApi,
|
||||
"/rag/pipelines/<string:pipeline_id>/customized/publish",
|
||||
)
|
||||
api.add_resource(
|
||||
PipelineTemplateInstallApi,
|
||||
"/rag/pipeline/built-in/templates/install",
|
||||
)
|
||||
@ -9,6 +9,7 @@ from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
||||
|
||||
import services
|
||||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.app.error import (
|
||||
ConversationCompletedError,
|
||||
@ -608,6 +609,18 @@ class DefaultRagPipelineBlockConfigApi(Resource):
|
||||
return rag_pipeline_service.get_default_block_config(node_type=block_type, filters=filters)
|
||||
|
||||
|
||||
class RagPipelineConfigApi(Resource):
|
||||
"""Resource for rag pipeline configuration."""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, pipeline_id):
|
||||
return {
|
||||
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
|
||||
}
|
||||
|
||||
|
||||
class PublishedAllRagPipelineApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@ -972,6 +985,10 @@ api.add_resource(
|
||||
DraftRagPipelineApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft",
|
||||
)
|
||||
api.add_resource(
|
||||
RagPipelineConfigApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/config",
|
||||
)
|
||||
api.add_resource(
|
||||
DraftRagPipelineRunApi,
|
||||
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/run",
|
||||
|
||||
@ -30,6 +30,7 @@ from extensions.ext_database import db
|
||||
from fields.document_fields import document_fields, document_status_fields
|
||||
from libs.login import current_user
|
||||
from models.dataset import Dataset, Document, DocumentSegment
|
||||
from models.model import EndUser
|
||||
from services.dataset_service import DatasetService, DocumentService
|
||||
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig
|
||||
from services.file_service import FileService
|
||||
@ -310,6 +311,8 @@ class DocumentAddByFileApi(DatasetApiResource):
|
||||
if not file.filename:
|
||||
raise FilenameNotExistsError
|
||||
|
||||
if not isinstance(current_user, EndUser):
|
||||
raise ValueError("Invalid user account")
|
||||
if not current_user:
|
||||
raise ValueError("current_user is required")
|
||||
upload_file = FileService(db.engine).upload_file(
|
||||
@ -403,6 +406,9 @@ class DocumentUpdateByFileApi(DatasetApiResource):
|
||||
if not current_user:
|
||||
raise ValueError("current_user is required")
|
||||
|
||||
if not isinstance(current_user, EndUser):
|
||||
raise ValueError("Invalid user account")
|
||||
|
||||
try:
|
||||
upload_file = FileService(db.engine).upload_file(
|
||||
filename=file.filename,
|
||||
|
||||
@ -551,7 +551,7 @@ class AdvancedChatAppGenerateTaskPipeline:
|
||||
total_steps=validated_state.node_run_steps,
|
||||
outputs=event.outputs,
|
||||
exceptions_count=event.exceptions_count,
|
||||
conversation_id=self._conversation_id,
|
||||
conversation_id=None,
|
||||
trace_manager=trace_manager,
|
||||
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
|
||||
)
|
||||
|
||||
@ -4,7 +4,7 @@ from enum import StrEnum
|
||||
from threading import Lock
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from httpx import Timeout, post
|
||||
from pydantic import BaseModel
|
||||
from yarl import URL
|
||||
|
||||
@ -13,17 +13,9 @@ from core.helper.code_executor.javascript.javascript_transformer import NodeJsTe
|
||||
from core.helper.code_executor.jinja2.jinja2_transformer import Jinja2TemplateTransformer
|
||||
from core.helper.code_executor.python3.python3_transformer import Python3TemplateTransformer
|
||||
from core.helper.code_executor.template_transformer import TemplateTransformer
|
||||
from core.helper.http_client_pooling import get_pooled_http_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
code_execution_endpoint_url = URL(str(dify_config.CODE_EXECUTION_ENDPOINT))
|
||||
CODE_EXECUTION_SSL_VERIFY = dify_config.CODE_EXECUTION_SSL_VERIFY
|
||||
_CODE_EXECUTOR_CLIENT_LIMITS = httpx.Limits(
|
||||
max_connections=dify_config.CODE_EXECUTION_POOL_MAX_CONNECTIONS,
|
||||
max_keepalive_connections=dify_config.CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS,
|
||||
keepalive_expiry=dify_config.CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY,
|
||||
)
|
||||
_CODE_EXECUTOR_CLIENT_KEY = "code_executor:http_client"
|
||||
|
||||
|
||||
class CodeExecutionError(Exception):
|
||||
@ -46,13 +38,6 @@ class CodeLanguage(StrEnum):
|
||||
JAVASCRIPT = "javascript"
|
||||
|
||||
|
||||
def _build_code_executor_client() -> httpx.Client:
|
||||
return httpx.Client(
|
||||
verify=CODE_EXECUTION_SSL_VERIFY,
|
||||
limits=_CODE_EXECUTOR_CLIENT_LIMITS,
|
||||
)
|
||||
|
||||
|
||||
class CodeExecutor:
|
||||
dependencies_cache: dict[str, str] = {}
|
||||
dependencies_cache_lock = Lock()
|
||||
@ -91,21 +76,17 @@ class CodeExecutor:
|
||||
"enable_network": True,
|
||||
}
|
||||
|
||||
timeout = httpx.Timeout(
|
||||
connect=dify_config.CODE_EXECUTION_CONNECT_TIMEOUT,
|
||||
read=dify_config.CODE_EXECUTION_READ_TIMEOUT,
|
||||
write=dify_config.CODE_EXECUTION_WRITE_TIMEOUT,
|
||||
pool=None,
|
||||
)
|
||||
|
||||
client = get_pooled_http_client(_CODE_EXECUTOR_CLIENT_KEY, _build_code_executor_client)
|
||||
|
||||
try:
|
||||
response = client.post(
|
||||
response = post(
|
||||
str(url),
|
||||
json=data,
|
||||
headers=headers,
|
||||
timeout=timeout,
|
||||
timeout=Timeout(
|
||||
connect=dify_config.CODE_EXECUTION_CONNECT_TIMEOUT,
|
||||
read=dify_config.CODE_EXECUTION_READ_TIMEOUT,
|
||||
write=dify_config.CODE_EXECUTION_WRITE_TIMEOUT,
|
||||
pool=None,
|
||||
),
|
||||
)
|
||||
if response.status_code == 503:
|
||||
raise CodeExecutionError("Code execution service is unavailable")
|
||||
@ -125,8 +106,8 @@ class CodeExecutor:
|
||||
|
||||
try:
|
||||
response_data = response.json()
|
||||
except Exception as e:
|
||||
raise CodeExecutionError("Failed to parse response") from e
|
||||
except:
|
||||
raise CodeExecutionError("Failed to parse response")
|
||||
|
||||
if (code := response_data.get("code")) != 0:
|
||||
raise CodeExecutionError(f"Got error code: {code}. Got error msg: {response_data.get('message')}")
|
||||
|
||||
@ -1,59 +0,0 @@
|
||||
"""HTTP client pooling utilities."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
|
||||
import httpx
|
||||
|
||||
ClientBuilder = Callable[[], httpx.Client]
|
||||
|
||||
|
||||
class HttpClientPoolFactory:
|
||||
"""Thread-safe factory that maintains reusable HTTP client instances."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._clients: dict[str, httpx.Client] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def get_or_create(self, key: str, builder: ClientBuilder) -> httpx.Client:
|
||||
"""Return a pooled client associated with ``key`` creating it on demand."""
|
||||
client = self._clients.get(key)
|
||||
if client is not None:
|
||||
return client
|
||||
|
||||
with self._lock:
|
||||
client = self._clients.get(key)
|
||||
if client is None:
|
||||
client = builder()
|
||||
self._clients[key] = client
|
||||
return client
|
||||
|
||||
def close_all(self) -> None:
|
||||
"""Close all pooled clients and clear the pool."""
|
||||
with self._lock:
|
||||
for client in self._clients.values():
|
||||
client.close()
|
||||
self._clients.clear()
|
||||
|
||||
|
||||
_factory = HttpClientPoolFactory()
|
||||
|
||||
|
||||
def get_pooled_http_client(key: str, builder: ClientBuilder) -> httpx.Client:
|
||||
"""Return a pooled client for the given ``key`` using ``builder`` when missing."""
|
||||
return _factory.get_or_create(key, builder)
|
||||
|
||||
|
||||
def close_all_pooled_clients() -> None:
|
||||
"""Close every client created through the pooling factory."""
|
||||
_factory.close_all()
|
||||
|
||||
|
||||
def _register_shutdown_hook() -> None:
|
||||
atexit.register(close_all_pooled_clients)
|
||||
|
||||
|
||||
_register_shutdown_hook()
|
||||
@ -23,7 +23,7 @@ def batch_fetch_plugin_manifests(plugin_ids: list[str]) -> Sequence[MarketplaceP
|
||||
return []
|
||||
|
||||
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
||||
response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version})
|
||||
response = httpx.post(url, json={"plugin_ids": plugin_ids})
|
||||
response.raise_for_status()
|
||||
|
||||
return [MarketplacePluginDeclaration(**plugin) for plugin in response.json()["data"]["plugins"]]
|
||||
@ -36,7 +36,7 @@ def batch_fetch_plugin_manifests_ignore_deserialization_error(
|
||||
return []
|
||||
|
||||
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
||||
response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version})
|
||||
response = httpx.post(url, json={"plugin_ids": plugin_ids})
|
||||
response.raise_for_status()
|
||||
result: list[MarketplacePluginDeclaration] = []
|
||||
for plugin in response.json()["data"]["plugins"]:
|
||||
|
||||
@ -8,23 +8,27 @@ import time
|
||||
import httpx
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper.http_client_pooling import get_pooled_http_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SSRF_DEFAULT_MAX_RETRIES = dify_config.SSRF_DEFAULT_MAX_RETRIES
|
||||
|
||||
http_request_node_ssl_verify = True # Default value for http_request_node_ssl_verify is True
|
||||
try:
|
||||
config_value = dify_config.HTTP_REQUEST_NODE_SSL_VERIFY
|
||||
http_request_node_ssl_verify_lower = str(config_value).lower()
|
||||
if http_request_node_ssl_verify_lower == "true":
|
||||
http_request_node_ssl_verify = True
|
||||
elif http_request_node_ssl_verify_lower == "false":
|
||||
http_request_node_ssl_verify = False
|
||||
else:
|
||||
raise ValueError("Invalid value. HTTP_REQUEST_NODE_SSL_VERIFY should be 'True' or 'False'")
|
||||
except NameError:
|
||||
http_request_node_ssl_verify = True
|
||||
|
||||
BACKOFF_FACTOR = 0.5
|
||||
STATUS_FORCELIST = [429, 500, 502, 503, 504]
|
||||
|
||||
_SSL_VERIFIED_POOL_KEY = "ssrf:verified"
|
||||
_SSL_UNVERIFIED_POOL_KEY = "ssrf:unverified"
|
||||
_SSRF_CLIENT_LIMITS = httpx.Limits(
|
||||
max_connections=dify_config.SSRF_POOL_MAX_CONNECTIONS,
|
||||
max_keepalive_connections=dify_config.SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS,
|
||||
keepalive_expiry=dify_config.SSRF_POOL_KEEPALIVE_EXPIRY,
|
||||
)
|
||||
|
||||
|
||||
class MaxRetriesExceededError(ValueError):
|
||||
"""Raised when the maximum number of retries is exceeded."""
|
||||
@ -32,45 +36,6 @@ class MaxRetriesExceededError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def _create_proxy_mounts() -> dict[str, httpx.HTTPTransport]:
|
||||
return {
|
||||
"http://": httpx.HTTPTransport(
|
||||
proxy=dify_config.SSRF_PROXY_HTTP_URL,
|
||||
),
|
||||
"https://": httpx.HTTPTransport(
|
||||
proxy=dify_config.SSRF_PROXY_HTTPS_URL,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _build_ssrf_client(verify: bool) -> httpx.Client:
|
||||
if dify_config.SSRF_PROXY_ALL_URL:
|
||||
return httpx.Client(
|
||||
proxy=dify_config.SSRF_PROXY_ALL_URL,
|
||||
verify=verify,
|
||||
limits=_SSRF_CLIENT_LIMITS,
|
||||
)
|
||||
|
||||
if dify_config.SSRF_PROXY_HTTP_URL and dify_config.SSRF_PROXY_HTTPS_URL:
|
||||
return httpx.Client(
|
||||
mounts=_create_proxy_mounts(),
|
||||
verify=verify,
|
||||
limits=_SSRF_CLIENT_LIMITS,
|
||||
)
|
||||
|
||||
return httpx.Client(verify=verify, limits=_SSRF_CLIENT_LIMITS)
|
||||
|
||||
|
||||
def _get_ssrf_client(ssl_verify_enabled: bool) -> httpx.Client:
|
||||
if not isinstance(ssl_verify_enabled, bool):
|
||||
raise ValueError("SSRF client verify flag must be a boolean")
|
||||
|
||||
return get_pooled_http_client(
|
||||
_SSL_VERIFIED_POOL_KEY if ssl_verify_enabled else _SSL_UNVERIFIED_POOL_KEY,
|
||||
lambda: _build_ssrf_client(verify=ssl_verify_enabled),
|
||||
)
|
||||
|
||||
|
||||
def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
|
||||
if "allow_redirects" in kwargs:
|
||||
allow_redirects = kwargs.pop("allow_redirects")
|
||||
@ -85,22 +50,33 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
|
||||
write=dify_config.SSRF_DEFAULT_WRITE_TIME_OUT,
|
||||
)
|
||||
|
||||
# prioritize per-call option, which can be switched on and off inside the HTTP node on the web UI
|
||||
verify_option = kwargs.pop("ssl_verify", dify_config.HTTP_REQUEST_NODE_SSL_VERIFY)
|
||||
client = _get_ssrf_client(verify_option)
|
||||
if "ssl_verify" not in kwargs:
|
||||
kwargs["ssl_verify"] = http_request_node_ssl_verify
|
||||
|
||||
ssl_verify = kwargs.pop("ssl_verify")
|
||||
|
||||
retries = 0
|
||||
while retries <= max_retries:
|
||||
try:
|
||||
response = client.request(method=method, url=url, **kwargs)
|
||||
if dify_config.SSRF_PROXY_ALL_URL:
|
||||
with httpx.Client(proxy=dify_config.SSRF_PROXY_ALL_URL, verify=ssl_verify) as client:
|
||||
response = client.request(method=method, url=url, **kwargs)
|
||||
elif dify_config.SSRF_PROXY_HTTP_URL and dify_config.SSRF_PROXY_HTTPS_URL:
|
||||
proxy_mounts = {
|
||||
"http://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTP_URL, verify=ssl_verify),
|
||||
"https://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTPS_URL, verify=ssl_verify),
|
||||
}
|
||||
with httpx.Client(mounts=proxy_mounts, verify=ssl_verify) as client:
|
||||
response = client.request(method=method, url=url, **kwargs)
|
||||
else:
|
||||
with httpx.Client(verify=ssl_verify) as client:
|
||||
response = client.request(method=method, url=url, **kwargs)
|
||||
|
||||
if response.status_code not in STATUS_FORCELIST:
|
||||
return response
|
||||
else:
|
||||
logger.warning(
|
||||
"Received status code %s for URL %s which is in the force list",
|
||||
response.status_code,
|
||||
url,
|
||||
"Received status code %s for URL %s which is in the force list", response.status_code, url
|
||||
)
|
||||
|
||||
except httpx.RequestError as e:
|
||||
|
||||
@ -1,28 +1,38 @@
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from opentelemetry.trace import Link, Status, StatusCode
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import (
|
||||
TraceClient,
|
||||
build_endpoint,
|
||||
convert_datetime_to_nanoseconds,
|
||||
convert_to_span_id,
|
||||
convert_to_trace_id,
|
||||
create_link,
|
||||
generate_span_id,
|
||||
)
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData, TraceMetadata
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
from core.ops.aliyun_trace.entities.semconv import (
|
||||
GEN_AI_COMPLETION,
|
||||
GEN_AI_FRAMEWORK,
|
||||
GEN_AI_MODEL_NAME,
|
||||
GEN_AI_PROMPT,
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE,
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE,
|
||||
GEN_AI_RESPONSE_FINISH_REASON,
|
||||
GEN_AI_SESSION_ID,
|
||||
GEN_AI_SPAN_KIND,
|
||||
GEN_AI_SYSTEM,
|
||||
GEN_AI_USAGE_INPUT_TOKENS,
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS,
|
||||
GEN_AI_USAGE_TOTAL_TOKENS,
|
||||
GEN_AI_USER_ID,
|
||||
INPUT_VALUE,
|
||||
OUTPUT_VALUE,
|
||||
RETRIEVAL_DOCUMENT,
|
||||
RETRIEVAL_QUERY,
|
||||
TOOL_DESCRIPTION,
|
||||
@ -30,15 +40,6 @@ from core.ops.aliyun_trace.entities.semconv import (
|
||||
TOOL_PARAMETERS,
|
||||
GenAISpanKind,
|
||||
)
|
||||
from core.ops.aliyun_trace.utils import (
|
||||
create_common_span_attributes,
|
||||
create_links_from_trace_id,
|
||||
create_status_from_error,
|
||||
extract_retrieval_documents,
|
||||
get_user_id_from_message_data,
|
||||
get_workflow_node_status,
|
||||
serialize_json_data,
|
||||
)
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
from core.ops.entities.config_entity import AliyunConfig
|
||||
from core.ops.entities.trace_entity import (
|
||||
@ -51,11 +52,12 @@ from core.ops.entities.trace_entity import (
|
||||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.rag.models.document import Document
|
||||
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from extensions.ext_database import db
|
||||
from models import WorkflowNodeExecutionTriggeredFrom
|
||||
from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -66,7 +68,8 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
aliyun_config: AliyunConfig,
|
||||
):
|
||||
super().__init__(aliyun_config)
|
||||
endpoint = build_endpoint(aliyun_config.endpoint, aliyun_config.license_key)
|
||||
base_url = aliyun_config.endpoint.rstrip("/")
|
||||
endpoint = urljoin(base_url, f"adapt_{aliyun_config.license_key}/api/otlp/traces")
|
||||
self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint)
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo):
|
||||
@ -92,422 +95,423 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
try:
|
||||
return self.trace_client.get_project_url()
|
||||
except Exception as e:
|
||||
logger.info("Aliyun get project url failed: %s", str(e), exc_info=True)
|
||||
raise ValueError(f"Aliyun get project url failed: {str(e)}")
|
||||
logger.info("Aliyun get run url failed: %s", str(e), exc_info=True)
|
||||
raise ValueError(f"Aliyun get run url failed: {str(e)}")
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(trace_info.workflow_run_id),
|
||||
workflow_span_id=convert_to_span_id(trace_info.workflow_run_id, "workflow"),
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
self.add_workflow_span(trace_info, trace_metadata)
|
||||
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
|
||||
self.add_workflow_span(trace_id, workflow_span_id, trace_info, links)
|
||||
|
||||
workflow_node_executions = self.get_workflow_node_executions(trace_info)
|
||||
for node_execution in workflow_node_executions:
|
||||
node_span = self.build_workflow_node_span(node_execution, trace_info, trace_metadata)
|
||||
node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
|
||||
self.trace_client.add_span(node_span)
|
||||
|
||||
def message_trace(self, trace_info: MessageTraceInfo):
|
||||
message_data = trace_info.message_data
|
||||
if message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
user_id = get_user_id_from_message_data(message_data)
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=user_id,
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
user_id = message_data.from_account_id
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: EndUser | None = (
|
||||
db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
user_id = end_user_data.session_id
|
||||
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
outputs_str = str(trace_info.outputs)
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
|
||||
message_span_id = convert_to_span_id(message_id, "message")
|
||||
message_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=message_span_id,
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.outputs),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
app_model_config = getattr(message_data, "app_model_config", {})
|
||||
app_model_config = getattr(trace_info.message_data, "app_model_config", {})
|
||||
pre_prompt = getattr(app_model_config, "pre_prompt", "")
|
||||
inputs_data = getattr(message_data, "inputs", {})
|
||||
|
||||
inputs_data = getattr(trace_info.message_data, "inputs", {})
|
||||
llm_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=message_span_id,
|
||||
span_id=convert_to_span_id(message_id, "llm"),
|
||||
name="llm",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
|
||||
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
|
||||
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: serialize_json_data(inputs_data),
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(inputs_data, ensure_ascii=False),
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: pre_prompt,
|
||||
GEN_AI_PROMPT: inputs_json,
|
||||
GEN_AI_COMPLETION: outputs_str,
|
||||
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: str(trace_info.outputs),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.outputs),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
)
|
||||
self.trace_client.add_span(llm_span)
|
||||
|
||||
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
|
||||
documents_data = extract_retrieval_documents(trace_info.documents)
|
||||
documents_json = serialize_json_data(documents_data)
|
||||
inputs_str = str(trace_info.inputs)
|
||||
|
||||
dataset_retrieval_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=generate_span_id(),
|
||||
name="dataset_retrieval",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.RETRIEVER,
|
||||
inputs=inputs_str,
|
||||
outputs=documents_json,
|
||||
),
|
||||
RETRIEVAL_QUERY: inputs_str,
|
||||
RETRIEVAL_DOCUMENT: documents_json,
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
RETRIEVAL_QUERY: str(trace_info.inputs),
|
||||
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
|
||||
INPUT_VALUE: str(trace_info.inputs),
|
||||
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
|
||||
},
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(dataset_retrieval_span)
|
||||
|
||||
def tool_trace(self, trace_info: ToolTraceInfo):
|
||||
if trace_info.message_data is None:
|
||||
return
|
||||
|
||||
message_id = trace_info.message_id
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
tool_config_json = serialize_json_data(trace_info.tool_config)
|
||||
tool_inputs_json = serialize_json_data(trace_info.tool_inputs)
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
|
||||
tool_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=generate_span_id(),
|
||||
name=trace_info.tool_name,
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TOOL,
|
||||
inputs=inputs_json,
|
||||
outputs=str(trace_info.tool_outputs),
|
||||
),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
TOOL_NAME: trace_info.tool_name,
|
||||
TOOL_DESCRIPTION: tool_config_json,
|
||||
TOOL_PARAMETERS: tool_inputs_json,
|
||||
TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False),
|
||||
TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.tool_outputs),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(tool_span)
|
||||
|
||||
def get_workflow_node_executions(self, trace_info: WorkflowTraceInfo) -> Sequence[WorkflowNodeExecution]:
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if not app_id:
|
||||
raise ValueError("No app_id found in trace_info metadata")
|
||||
|
||||
service_account = self.get_service_account_with_tenant(app_id)
|
||||
|
||||
# through workflow_run_id get all_nodes_execution using repository
|
||||
session_factory = sessionmaker(bind=db.engine)
|
||||
# Find the app's creator account
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
# Get the app to find its creator
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if not app_id:
|
||||
raise ValueError("No app_id found in trace_info metadata")
|
||||
app_stmt = select(App).where(App.id == app_id)
|
||||
app = session.scalar(app_stmt)
|
||||
if not app:
|
||||
raise ValueError(f"App with id {app_id} not found")
|
||||
|
||||
if not app.created_by:
|
||||
raise ValueError(f"App with id {app_id} has no creator (created_by is None)")
|
||||
account_stmt = select(Account).where(Account.id == app.created_by)
|
||||
service_account = session.scalar(account_stmt)
|
||||
if not service_account:
|
||||
raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}")
|
||||
current_tenant = (
|
||||
session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first()
|
||||
)
|
||||
if not current_tenant:
|
||||
raise ValueError(f"Current tenant not found for account {service_account.id}")
|
||||
service_account.set_tenant_id(current_tenant.tenant_id)
|
||||
workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
||||
session_factory=session_factory,
|
||||
user=service_account,
|
||||
app_id=app_id,
|
||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
|
||||
)
|
||||
|
||||
return workflow_node_execution_repository.get_by_workflow_run(workflow_run_id=trace_info.workflow_run_id)
|
||||
# Get all executions for this workflow run
|
||||
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
|
||||
workflow_run_id=trace_info.workflow_run_id
|
||||
)
|
||||
return workflow_node_executions
|
||||
|
||||
def build_workflow_node_span(
|
||||
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
|
||||
self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo, workflow_span_id: int
|
||||
):
|
||||
try:
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
|
||||
node_span = self.build_workflow_llm_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
|
||||
node_span = self.build_workflow_retrieval_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
elif node_execution.node_type == NodeType.TOOL:
|
||||
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
|
||||
node_span = self.build_workflow_tool_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
else:
|
||||
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)
|
||||
node_span = self.build_workflow_task_span(trace_id, workflow_span_id, trace_info, node_execution)
|
||||
return node_span
|
||||
except Exception as e:
|
||||
logger.debug("Error occurred in build_workflow_node_span: %s", e, exc_info=True)
|
||||
return None
|
||||
|
||||
def get_workflow_node_status(self, node_execution: WorkflowNodeExecution) -> Status:
|
||||
span_status: Status = Status(StatusCode.UNSET)
|
||||
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
span_status = Status(StatusCode.OK)
|
||||
elif node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
||||
span_status = Status(StatusCode.ERROR, str(node_execution.error))
|
||||
return span_status
|
||||
|
||||
def build_workflow_task_span(
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
inputs_json = serialize_json_data(node_execution.inputs)
|
||||
outputs_json = serialize_json_data(node_execution.outputs)
|
||||
return SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TASK,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
},
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
def build_workflow_tool_span(
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
tool_des = {}
|
||||
if node_execution.metadata:
|
||||
tool_des = node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {})
|
||||
|
||||
inputs_json = serialize_json_data(node_execution.inputs or {})
|
||||
outputs_json = serialize_json_data(node_execution.outputs)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.TOOL,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
TOOL_NAME: node_execution.title,
|
||||
TOOL_DESCRIPTION: serialize_json_data(tool_des),
|
||||
TOOL_PARAMETERS: inputs_json,
|
||||
TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False),
|
||||
TOOL_PARAMETERS: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
},
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
def build_workflow_retrieval_span(
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
input_value = str(node_execution.inputs.get("query", "")) if node_execution.inputs else ""
|
||||
output_value = serialize_json_data(node_execution.outputs.get("result", [])) if node_execution.outputs else ""
|
||||
|
||||
input_value = ""
|
||||
if node_execution.inputs:
|
||||
input_value = str(node_execution.inputs.get("query", ""))
|
||||
output_value = ""
|
||||
if node_execution.outputs:
|
||||
output_value = json.dumps(node_execution.outputs.get("result", []), ensure_ascii=False)
|
||||
return SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.RETRIEVER,
|
||||
inputs=input_value,
|
||||
outputs=output_value,
|
||||
),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
RETRIEVAL_QUERY: input_value,
|
||||
RETRIEVAL_DOCUMENT: output_value,
|
||||
INPUT_VALUE: input_value,
|
||||
OUTPUT_VALUE: output_value,
|
||||
},
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
def build_workflow_llm_span(
|
||||
self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
process_data = node_execution.process_data or {}
|
||||
outputs = node_execution.outputs or {}
|
||||
usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {})
|
||||
|
||||
prompts_json = serialize_json_data(process_data.get("prompts", []))
|
||||
text_output = str(outputs.get("text", ""))
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
parent_span_id=trace_metadata.workflow_span_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=convert_datetime_to_nanoseconds(node_execution.created_at),
|
||||
end_time=convert_datetime_to_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=prompts_json,
|
||||
outputs=text_output,
|
||||
),
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_MODEL_NAME: process_data.get("model_name") or "",
|
||||
GEN_AI_SYSTEM: process_data.get("model_provider") or "",
|
||||
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
|
||||
GEN_AI_PROMPT: prompts_json,
|
||||
GEN_AI_COMPLETION: text_output,
|
||||
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: str(outputs.get("text", "")),
|
||||
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason") or "",
|
||||
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(outputs.get("text", "")),
|
||||
},
|
||||
status=get_workflow_node_status(node_execution),
|
||||
links=trace_metadata.links,
|
||||
status=self.get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
def add_workflow_span(self, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata):
|
||||
def add_workflow_span(
|
||||
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, links: Sequence[Link]
|
||||
):
|
||||
message_span_id = None
|
||||
if trace_info.message_id:
|
||||
message_span_id = convert_to_span_id(trace_info.message_id, "message")
|
||||
status = create_status_from_error(trace_info.error)
|
||||
|
||||
inputs_json = serialize_json_data(trace_info.workflow_run_inputs)
|
||||
outputs_json = serialize_json_data(trace_info.workflow_run_outputs)
|
||||
|
||||
if message_span_id:
|
||||
user_id = trace_info.metadata.get("user_id")
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
if message_span_id: # chatflow
|
||||
message_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=message_span_id,
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
outputs=outputs_json,
|
||||
),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
workflow_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=message_span_id,
|
||||
span_id=trace_metadata.workflow_span_id,
|
||||
span_id=workflow_span_id,
|
||||
name="workflow",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
attributes={
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(workflow_span)
|
||||
|
||||
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
|
||||
message_id = trace_info.message_id
|
||||
status = create_status_from_error(trace_info.error)
|
||||
status: Status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(message_id),
|
||||
workflow_span_id=0,
|
||||
session_id=trace_info.metadata.get("conversation_id") or "",
|
||||
user_id=str(trace_info.metadata.get("user_id") or ""),
|
||||
links=create_links_from_trace_id(trace_info.trace_id),
|
||||
)
|
||||
|
||||
inputs_json = serialize_json_data(trace_info.inputs)
|
||||
suggested_question_json = serialize_json_data(trace_info.suggested_question)
|
||||
trace_id = convert_to_trace_id(message_id)
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(create_link(trace_id_str=trace_info.trace_id))
|
||||
|
||||
suggested_question_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
trace_id=trace_id,
|
||||
parent_span_id=convert_to_span_id(message_id, "message"),
|
||||
span_id=convert_to_span_id(message_id, "suggested_question"),
|
||||
name="suggested_question",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.LLM,
|
||||
inputs=inputs_json,
|
||||
outputs=suggested_question_json,
|
||||
),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
|
||||
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
|
||||
GEN_AI_PROMPT: inputs_json,
|
||||
GEN_AI_COMPLETION: suggested_question_json,
|
||||
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
links=links,
|
||||
)
|
||||
self.trace_client.add_span(suggested_question_span)
|
||||
|
||||
|
||||
def extract_retrieval_documents(documents: list[Document]):
|
||||
documents_data = []
|
||||
for document in documents:
|
||||
document_data = {
|
||||
"content": document.page_content,
|
||||
"metadata": {
|
||||
"dataset_id": document.metadata.get("dataset_id"),
|
||||
"doc_id": document.metadata.get("doc_id"),
|
||||
"document_id": document.metadata.get("document_id"),
|
||||
},
|
||||
"score": document.metadata.get("score"),
|
||||
}
|
||||
documents_data.append(document_data)
|
||||
return documents_data
|
||||
|
||||
@ -7,8 +7,6 @@ import uuid
|
||||
from collections import deque
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from typing import Final
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import httpx
|
||||
from opentelemetry import trace as trace_api
|
||||
@ -22,12 +20,8 @@ from opentelemetry.trace import Link, SpanContext, TraceFlags
|
||||
from configs import dify_config
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
|
||||
|
||||
INVALID_SPAN_ID: Final[int] = 0x0000000000000000
|
||||
INVALID_TRACE_ID: Final[int] = 0x00000000000000000000000000000000
|
||||
DEFAULT_TIMEOUT: Final[int] = 5
|
||||
DEFAULT_MAX_QUEUE_SIZE: Final[int] = 1000
|
||||
DEFAULT_SCHEDULE_DELAY_SEC: Final[int] = 5
|
||||
DEFAULT_MAX_EXPORT_BATCH_SIZE: Final[int] = 50
|
||||
INVALID_SPAN_ID = 0x0000000000000000
|
||||
INVALID_TRACE_ID = 0x00000000000000000000000000000000
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -37,9 +31,9 @@ class TraceClient:
|
||||
self,
|
||||
service_name: str,
|
||||
endpoint: str,
|
||||
max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE,
|
||||
schedule_delay_sec: int = DEFAULT_SCHEDULE_DELAY_SEC,
|
||||
max_export_batch_size: int = DEFAULT_MAX_EXPORT_BATCH_SIZE,
|
||||
max_queue_size: int = 1000,
|
||||
schedule_delay_sec: int = 5,
|
||||
max_export_batch_size: int = 50,
|
||||
):
|
||||
self.endpoint = endpoint
|
||||
self.resource = Resource(
|
||||
@ -69,9 +63,9 @@ class TraceClient:
|
||||
def export(self, spans: Sequence[ReadableSpan]):
|
||||
self.exporter.export(spans)
|
||||
|
||||
def api_check(self) -> bool:
|
||||
def api_check(self):
|
||||
try:
|
||||
response = httpx.head(self.endpoint, timeout=DEFAULT_TIMEOUT)
|
||||
response = httpx.head(self.endpoint, timeout=5)
|
||||
if response.status_code == 405:
|
||||
return True
|
||||
else:
|
||||
@ -81,13 +75,12 @@ class TraceClient:
|
||||
logger.debug("AliyunTrace API check failed: %s", str(e))
|
||||
raise ValueError(f"AliyunTrace API check failed: {str(e)}")
|
||||
|
||||
def get_project_url(self) -> str:
|
||||
def get_project_url(self):
|
||||
return "https://arms.console.aliyun.com/#/llm"
|
||||
|
||||
def add_span(self, span_data: SpanData | None) -> None:
|
||||
def add_span(self, span_data: SpanData):
|
||||
if span_data is None:
|
||||
return
|
||||
|
||||
span: ReadableSpan = self.span_builder.build_span(span_data)
|
||||
with self.condition:
|
||||
if len(self.queue) == self.max_queue_size:
|
||||
@ -99,14 +92,14 @@ class TraceClient:
|
||||
if len(self.queue) >= self.max_export_batch_size:
|
||||
self.condition.notify()
|
||||
|
||||
def _worker(self) -> None:
|
||||
def _worker(self):
|
||||
while not self.done:
|
||||
with self.condition:
|
||||
if len(self.queue) < self.max_export_batch_size and not self.done:
|
||||
self.condition.wait(timeout=self.schedule_delay_sec)
|
||||
self._export_batch()
|
||||
|
||||
def _export_batch(self) -> None:
|
||||
def _export_batch(self):
|
||||
spans_to_export: list[ReadableSpan] = []
|
||||
with self.condition:
|
||||
while len(spans_to_export) < self.max_export_batch_size and self.queue:
|
||||
@ -118,7 +111,7 @@ class TraceClient:
|
||||
except Exception as e:
|
||||
logger.debug("Error exporting spans: %s", e)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
def shutdown(self):
|
||||
with self.condition:
|
||||
self.done = True
|
||||
self.condition.notify_all()
|
||||
@ -128,7 +121,7 @@ class TraceClient:
|
||||
|
||||
|
||||
class SpanBuilder:
|
||||
def __init__(self, resource: Resource) -> None:
|
||||
def __init__(self, resource):
|
||||
self.resource = resource
|
||||
self.instrumentation_scope = InstrumentationScope(
|
||||
__name__,
|
||||
@ -174,12 +167,8 @@ class SpanBuilder:
|
||||
|
||||
|
||||
def create_link(trace_id_str: str) -> Link:
|
||||
placeholder_span_id = INVALID_SPAN_ID
|
||||
try:
|
||||
trace_id = int(trace_id_str, 16)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid trace ID format: {trace_id_str}") from e
|
||||
|
||||
placeholder_span_id = 0x0000000000000000
|
||||
trace_id = int(trace_id_str, 16)
|
||||
span_context = SpanContext(
|
||||
trace_id=trace_id, span_id=placeholder_span_id, is_remote=False, trace_flags=TraceFlags(TraceFlags.SAMPLED)
|
||||
)
|
||||
@ -195,29 +184,26 @@ def generate_span_id() -> int:
|
||||
|
||||
|
||||
def convert_to_trace_id(uuid_v4: str | None) -> int:
|
||||
if uuid_v4 is None:
|
||||
raise ValueError("UUID cannot be None")
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
return uuid_obj.int
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid UUID input: {uuid_v4}") from e
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
|
||||
|
||||
def convert_string_to_id(string: str | None) -> int:
|
||||
if not string:
|
||||
return generate_span_id()
|
||||
hash_bytes = hashlib.sha256(string.encode("utf-8")).digest()
|
||||
return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
|
||||
id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
|
||||
return id
|
||||
|
||||
|
||||
def convert_to_span_id(uuid_v4: str | None, span_type: str) -> int:
|
||||
if uuid_v4 is None:
|
||||
raise ValueError("UUID cannot be None")
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid UUID input: {uuid_v4}") from e
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
combined_key = f"{uuid_obj.hex}-{span_type}"
|
||||
return convert_string_to_id(combined_key)
|
||||
|
||||
@ -226,11 +212,5 @@ def convert_datetime_to_nanoseconds(start_time_a: datetime | None) -> int | None
|
||||
if start_time_a is None:
|
||||
return None
|
||||
timestamp_in_seconds = start_time_a.timestamp()
|
||||
return int(timestamp_in_seconds * 1e9)
|
||||
|
||||
|
||||
def build_endpoint(base_url: str, license_key: str) -> str:
|
||||
if "log.aliyuncs.com" in base_url: # cms2.0 endpoint
|
||||
return urljoin(base_url, f"adapt_{license_key}/api/v1/traces")
|
||||
else: # xtrace endpoint
|
||||
return urljoin(base_url, f"adapt_{license_key}/api/otlp/traces")
|
||||
timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9)
|
||||
return timestamp_in_nanoseconds
|
||||
|
||||
@ -1,33 +1,18 @@
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.sdk.trace import Event, Status, StatusCode
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
@dataclass
|
||||
class TraceMetadata:
|
||||
"""Metadata for trace operations, containing common attributes for all spans in a trace."""
|
||||
|
||||
trace_id: int
|
||||
workflow_span_id: int
|
||||
session_id: str
|
||||
user_id: str
|
||||
links: list[trace_api.Link]
|
||||
|
||||
|
||||
class SpanData(BaseModel):
|
||||
"""Data model for span information in Aliyun trace system."""
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
trace_id: int = Field(..., description="The unique identifier for the trace.")
|
||||
parent_span_id: int | None = Field(None, description="The ID of the parent span, if any.")
|
||||
span_id: int = Field(..., description="The unique identifier for this span.")
|
||||
name: str = Field(..., description="The name of the span.")
|
||||
attributes: dict[str, Any] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
attributes: dict[str, str] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.")
|
||||
links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.")
|
||||
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
|
||||
|
||||
@ -1,37 +1,56 @@
|
||||
from enum import StrEnum
|
||||
from typing import Final
|
||||
|
||||
# Public attributes
|
||||
GEN_AI_SESSION_ID: Final[str] = "gen_ai.session.id"
|
||||
GEN_AI_USER_ID: Final[str] = "gen_ai.user.id"
|
||||
GEN_AI_USER_NAME: Final[str] = "gen_ai.user.name"
|
||||
GEN_AI_SPAN_KIND: Final[str] = "gen_ai.span.kind"
|
||||
GEN_AI_FRAMEWORK: Final[str] = "gen_ai.framework"
|
||||
# public
|
||||
GEN_AI_SESSION_ID = "gen_ai.session.id"
|
||||
|
||||
# Chain attributes
|
||||
INPUT_VALUE: Final[str] = "input.value"
|
||||
OUTPUT_VALUE: Final[str] = "output.value"
|
||||
GEN_AI_USER_ID = "gen_ai.user.id"
|
||||
|
||||
# Retriever attributes
|
||||
RETRIEVAL_QUERY: Final[str] = "retrieval.query"
|
||||
RETRIEVAL_DOCUMENT: Final[str] = "retrieval.document"
|
||||
GEN_AI_USER_NAME = "gen_ai.user.name"
|
||||
|
||||
# LLM attributes
|
||||
GEN_AI_MODEL_NAME: Final[str] = "gen_ai.model_name"
|
||||
GEN_AI_SYSTEM: Final[str] = "gen_ai.system"
|
||||
GEN_AI_USAGE_INPUT_TOKENS: Final[str] = "gen_ai.usage.input_tokens"
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: Final[str] = "gen_ai.usage.output_tokens"
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: Final[str] = "gen_ai.usage.total_tokens"
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE: Final[str] = "gen_ai.prompt_template.template"
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE: Final[str] = "gen_ai.prompt_template.variable"
|
||||
GEN_AI_PROMPT: Final[str] = "gen_ai.prompt"
|
||||
GEN_AI_COMPLETION: Final[str] = "gen_ai.completion"
|
||||
GEN_AI_RESPONSE_FINISH_REASON: Final[str] = "gen_ai.response.finish_reason"
|
||||
GEN_AI_SPAN_KIND = "gen_ai.span.kind"
|
||||
|
||||
# Tool attributes
|
||||
TOOL_NAME: Final[str] = "tool.name"
|
||||
TOOL_DESCRIPTION: Final[str] = "tool.description"
|
||||
TOOL_PARAMETERS: Final[str] = "tool.parameters"
|
||||
GEN_AI_FRAMEWORK = "gen_ai.framework"
|
||||
|
||||
|
||||
# Chain
|
||||
INPUT_VALUE = "input.value"
|
||||
|
||||
OUTPUT_VALUE = "output.value"
|
||||
|
||||
|
||||
# Retriever
|
||||
RETRIEVAL_QUERY = "retrieval.query"
|
||||
|
||||
RETRIEVAL_DOCUMENT = "retrieval.document"
|
||||
|
||||
|
||||
# LLM
|
||||
GEN_AI_MODEL_NAME = "gen_ai.model_name"
|
||||
|
||||
GEN_AI_SYSTEM = "gen_ai.system"
|
||||
|
||||
GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
|
||||
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
|
||||
|
||||
GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE = "gen_ai.prompt_template.template"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE = "gen_ai.prompt_template.variable"
|
||||
|
||||
GEN_AI_PROMPT = "gen_ai.prompt"
|
||||
|
||||
GEN_AI_COMPLETION = "gen_ai.completion"
|
||||
|
||||
GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason"
|
||||
|
||||
# Tool
|
||||
TOOL_NAME = "tool.name"
|
||||
|
||||
TOOL_DESCRIPTION = "tool.description"
|
||||
|
||||
TOOL_PARAMETERS = "tool.parameters"
|
||||
|
||||
|
||||
class GenAISpanKind(StrEnum):
|
||||
|
||||
@ -1,95 +0,0 @@
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from opentelemetry.trace import Link, Status, StatusCode
|
||||
|
||||
from core.ops.aliyun_trace.entities.semconv import (
|
||||
GEN_AI_FRAMEWORK,
|
||||
GEN_AI_SESSION_ID,
|
||||
GEN_AI_SPAN_KIND,
|
||||
GEN_AI_USER_ID,
|
||||
INPUT_VALUE,
|
||||
OUTPUT_VALUE,
|
||||
GenAISpanKind,
|
||||
)
|
||||
from core.rag.models.document import Document
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import WorkflowNodeExecutionStatus
|
||||
from extensions.ext_database import db
|
||||
from models import EndUser
|
||||
|
||||
# Constants
|
||||
DEFAULT_JSON_ENSURE_ASCII = False
|
||||
DEFAULT_FRAMEWORK_NAME = "dify"
|
||||
|
||||
|
||||
def get_user_id_from_message_data(message_data) -> str:
|
||||
user_id = message_data.from_account_id
|
||||
if message_data.from_end_user_id:
|
||||
end_user_data: EndUser | None = (
|
||||
db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first()
|
||||
)
|
||||
if end_user_data is not None:
|
||||
user_id = end_user_data.session_id
|
||||
return user_id
|
||||
|
||||
|
||||
def create_status_from_error(error: str | None) -> Status:
|
||||
if error:
|
||||
return Status(StatusCode.ERROR, error)
|
||||
return Status(StatusCode.OK)
|
||||
|
||||
|
||||
def get_workflow_node_status(node_execution: WorkflowNodeExecution) -> Status:
|
||||
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
return Status(StatusCode.OK)
|
||||
if node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
||||
return Status(StatusCode.ERROR, str(node_execution.error))
|
||||
return Status(StatusCode.UNSET)
|
||||
|
||||
|
||||
def create_links_from_trace_id(trace_id: str | None) -> list[Link]:
|
||||
from core.ops.aliyun_trace.data_exporter.traceclient import create_link
|
||||
|
||||
links = []
|
||||
if trace_id:
|
||||
links.append(create_link(trace_id_str=trace_id))
|
||||
return links
|
||||
|
||||
|
||||
def extract_retrieval_documents(documents: list[Document]) -> list[dict[str, Any]]:
|
||||
documents_data = []
|
||||
for document in documents:
|
||||
document_data = {
|
||||
"content": document.page_content,
|
||||
"metadata": {
|
||||
"dataset_id": document.metadata.get("dataset_id"),
|
||||
"doc_id": document.metadata.get("doc_id"),
|
||||
"document_id": document.metadata.get("document_id"),
|
||||
},
|
||||
"score": document.metadata.get("score"),
|
||||
}
|
||||
documents_data.append(document_data)
|
||||
return documents_data
|
||||
|
||||
|
||||
def serialize_json_data(data: Any, ensure_ascii: bool = DEFAULT_JSON_ENSURE_ASCII) -> str:
|
||||
return json.dumps(data, ensure_ascii=ensure_ascii)
|
||||
|
||||
|
||||
def create_common_span_attributes(
|
||||
session_id: str = "",
|
||||
user_id: str = "",
|
||||
span_kind: str = GenAISpanKind.CHAIN,
|
||||
framework: str = DEFAULT_FRAMEWORK_NAME,
|
||||
inputs: str = "",
|
||||
outputs: str = "",
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
GEN_AI_SESSION_ID: session_id,
|
||||
GEN_AI_USER_ID: user_id,
|
||||
GEN_AI_SPAN_KIND: span_kind,
|
||||
GEN_AI_FRAMEWORK: framework,
|
||||
INPUT_VALUE: inputs,
|
||||
OUTPUT_VALUE: outputs,
|
||||
}
|
||||
@ -191,8 +191,7 @@ class AliyunConfig(BaseTracingConfig):
|
||||
@field_validator("endpoint")
|
||||
@classmethod
|
||||
def endpoint_validator(cls, v, info: ValidationInfo):
|
||||
# aliyun uses two URL formats, which may include a URL path
|
||||
return validate_url_with_path(v, "https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
return cls.validate_endpoint_url(v, "https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
|
||||
OPS_FILE_PATH = "ops_trace/"
|
||||
|
||||
@ -8,7 +8,6 @@ from typing import Any
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.http import parse_options_header
|
||||
|
||||
from constants import AUDIO_EXTENSIONS, DOCUMENT_EXTENSIONS, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS
|
||||
from core.file import File, FileBelongsTo, FileTransferMethod, FileType, FileUploadConfig, helpers
|
||||
@ -248,25 +247,6 @@ def _build_from_remote_url(
|
||||
)
|
||||
|
||||
|
||||
def _extract_filename(url_path: str, content_disposition: str | None) -> str | None:
|
||||
filename = None
|
||||
# Try to extract from Content-Disposition header first
|
||||
if content_disposition:
|
||||
_, params = parse_options_header(content_disposition)
|
||||
# RFC 5987 https://datatracker.ietf.org/doc/html/rfc5987: filename* takes precedence over filename
|
||||
filename = params.get("filename*") or params.get("filename")
|
||||
# Fallback to URL path if no filename from header
|
||||
if not filename:
|
||||
filename = os.path.basename(url_path)
|
||||
return filename or None
|
||||
|
||||
|
||||
def _guess_mime_type(filename: str) -> str:
|
||||
"""Guess MIME type from filename, returning empty string if None."""
|
||||
guessed_mime, _ = mimetypes.guess_type(filename)
|
||||
return guessed_mime or ""
|
||||
|
||||
|
||||
def _get_remote_file_info(url: str):
|
||||
file_size = -1
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
@ -274,26 +254,23 @@ def _get_remote_file_info(url: str):
|
||||
filename = os.path.basename(url_path)
|
||||
|
||||
# Initialize mime_type from filename as fallback
|
||||
mime_type = _guess_mime_type(filename)
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
if mime_type is None:
|
||||
mime_type = ""
|
||||
|
||||
resp = ssrf_proxy.head(url, follow_redirects=True)
|
||||
if resp.status_code == httpx.codes.OK:
|
||||
content_disposition = resp.headers.get("Content-Disposition")
|
||||
extracted_filename = _extract_filename(url_path, content_disposition)
|
||||
if extracted_filename:
|
||||
filename = extracted_filename
|
||||
mime_type = _guess_mime_type(filename)
|
||||
if content_disposition := resp.headers.get("Content-Disposition"):
|
||||
filename = str(content_disposition.split("filename=")[-1].strip('"'))
|
||||
# Re-guess mime_type from updated filename
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
if mime_type is None:
|
||||
mime_type = ""
|
||||
file_size = int(resp.headers.get("Content-Length", file_size))
|
||||
# Fallback to Content-Type header if mime_type is still empty
|
||||
if not mime_type:
|
||||
mime_type = resp.headers.get("Content-Type", "").split(";")[0].strip()
|
||||
|
||||
if not filename:
|
||||
extension = mimetypes.guess_extension(mime_type) or ".bin"
|
||||
filename = f"{uuid.uuid4().hex}{extension}"
|
||||
if not mime_type:
|
||||
mime_type = _guess_mime_type(filename)
|
||||
|
||||
return mime_type, filename, file_size
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,37 @@
|
||||
"""remove-builtin-template-user
|
||||
|
||||
Revision ID: bf0bcbf45396
|
||||
Revises: 68519ad5cd18
|
||||
Create Date: 2025-09-25 16:50:32.245503
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'bf0bcbf45396'
|
||||
down_revision = '68519ad5cd18'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
|
||||
with op.batch_alter_table('pipeline_built_in_templates', schema=None) as batch_op:
|
||||
batch_op.drop_column('updated_by')
|
||||
batch_op.drop_column('created_by')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
|
||||
with op.batch_alter_table('pipeline_built_in_templates', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('created_by', sa.UUID(), autoincrement=False, nullable=False))
|
||||
batch_op.add_column(sa.Column('updated_by', sa.UUID(), autoincrement=False, nullable=True))
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@ -1239,15 +1239,6 @@ class PipelineBuiltInTemplate(Base): # type: ignore[name-defined]
|
||||
language = db.Column(db.String(255), nullable=False)
|
||||
created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
created_by = db.Column(StringUUID, nullable=False)
|
||||
updated_by = db.Column(StringUUID, nullable=True)
|
||||
|
||||
@property
|
||||
def created_user_name(self):
|
||||
account = db.session.query(Account).where(Account.id == self.created_by).first()
|
||||
if account:
|
||||
return account.name
|
||||
return ""
|
||||
|
||||
|
||||
class PipelineCustomizedTemplate(Base): # type: ignore[name-defined]
|
||||
|
||||
@ -128,3 +128,10 @@ class KnowledgeConfiguration(BaseModel):
|
||||
if v is None:
|
||||
return ""
|
||||
return v
|
||||
|
||||
|
||||
class PipelineBuiltInTemplateEntity(BaseModel):
|
||||
template_id: str | None = None
|
||||
name: str
|
||||
description: str
|
||||
language: str
|
||||
|
||||
@ -74,5 +74,4 @@ class DatabasePipelineTemplateRetrieval(PipelineTemplateRetrievalBase):
|
||||
"chunk_structure": pipeline_template.chunk_structure,
|
||||
"export_data": pipeline_template.yaml_content,
|
||||
"graph": graph_data,
|
||||
"created_by": pipeline_template.created_user_name,
|
||||
}
|
||||
|
||||
@ -8,6 +8,7 @@ from datetime import UTC, datetime
|
||||
from typing import Any, Union, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import yaml
|
||||
from flask_login import current_user
|
||||
from sqlalchemy import func, or_, select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
@ -60,6 +61,7 @@ from models.dataset import ( # type: ignore
|
||||
Document,
|
||||
DocumentPipelineExecutionLog,
|
||||
Pipeline,
|
||||
PipelineBuiltInTemplate,
|
||||
PipelineCustomizedTemplate,
|
||||
PipelineRecommendedPlugin,
|
||||
)
|
||||
@ -76,6 +78,7 @@ from repositories.factory import DifyAPIRepositoryFactory
|
||||
from services.datasource_provider_service import DatasourceProviderService
|
||||
from services.entities.knowledge_entities.rag_pipeline_entities import (
|
||||
KnowledgeConfiguration,
|
||||
PipelineBuiltInTemplateEntity,
|
||||
PipelineTemplateInfoEntity,
|
||||
)
|
||||
from services.errors.app import WorkflowHashNotEqualError
|
||||
@ -1454,3 +1457,140 @@ class RagPipelineService:
|
||||
if not pipeline:
|
||||
raise ValueError("Pipeline not found")
|
||||
return pipeline
|
||||
|
||||
def install_built_in_pipeline_template(
|
||||
self, args: PipelineBuiltInTemplateEntity, file_content: str, auth_token: str
|
||||
) -> None:
|
||||
"""
|
||||
Install built-in pipeline template
|
||||
|
||||
Args:
|
||||
args: Pipeline built-in template entity with template metadata
|
||||
file_content: YAML content of the pipeline template
|
||||
auth_token: Authentication token for authorization
|
||||
|
||||
Raises:
|
||||
ValueError: If validation fails or template processing errors occur
|
||||
"""
|
||||
# Validate authentication
|
||||
self._validate_auth_token(auth_token)
|
||||
|
||||
# Parse and validate template content
|
||||
pipeline_template_dsl = self._parse_template_content(file_content)
|
||||
|
||||
# Extract template metadata
|
||||
icon = self._extract_icon_metadata(pipeline_template_dsl)
|
||||
chunk_structure = self._extract_chunk_structure(pipeline_template_dsl)
|
||||
|
||||
# Prepare template data
|
||||
template_data = {
|
||||
"name": args.name,
|
||||
"description": args.description,
|
||||
"chunk_structure": chunk_structure,
|
||||
"icon": icon,
|
||||
"language": args.language,
|
||||
"yaml_content": file_content,
|
||||
}
|
||||
|
||||
# Use transaction for database operations
|
||||
try:
|
||||
if args.template_id:
|
||||
self._update_existing_template(args.template_id, template_data)
|
||||
else:
|
||||
self._create_new_template(template_data)
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
raise ValueError(f"Failed to install pipeline template: {str(e)}")
|
||||
|
||||
def _validate_auth_token(self, auth_token: str) -> None:
|
||||
"""Validate the authentication token"""
|
||||
config_auth_token = dify_config.UPLOAD_KNOWLEDGE_PIPELINE_TEMPLATE_TOKEN
|
||||
if not config_auth_token:
|
||||
raise ValueError("Auth token configuration is required")
|
||||
if config_auth_token != auth_token:
|
||||
raise ValueError("Auth token is incorrect")
|
||||
|
||||
def _parse_template_content(self, file_content: str) -> dict:
|
||||
"""Parse and validate YAML template content"""
|
||||
try:
|
||||
pipeline_template_dsl = yaml.safe_load(file_content)
|
||||
except yaml.YAMLError as e:
|
||||
raise ValueError(f"Invalid YAML content: {str(e)}")
|
||||
|
||||
if not pipeline_template_dsl:
|
||||
raise ValueError("Pipeline template DSL is required")
|
||||
|
||||
return pipeline_template_dsl
|
||||
|
||||
def _extract_icon_metadata(self, pipeline_template_dsl: dict) -> dict:
|
||||
"""Extract icon metadata from template DSL"""
|
||||
rag_pipeline_info = pipeline_template_dsl.get("rag_pipeline", {})
|
||||
|
||||
return {
|
||||
"icon": rag_pipeline_info.get("icon", "📙"),
|
||||
"icon_type": rag_pipeline_info.get("icon_type", "emoji"),
|
||||
"icon_background": rag_pipeline_info.get("icon_background", "#FFEAD5"),
|
||||
"icon_url": rag_pipeline_info.get("icon_url"),
|
||||
}
|
||||
|
||||
def _extract_chunk_structure(self, pipeline_template_dsl: dict) -> str:
|
||||
"""Extract chunk structure from template DSL"""
|
||||
nodes = pipeline_template_dsl.get("workflow", {}).get("graph", {}).get("nodes", [])
|
||||
|
||||
# Use generator expression for efficiency
|
||||
chunk_structure = next(
|
||||
(
|
||||
node.get("data", {}).get("chunk_structure")
|
||||
for node in nodes
|
||||
if node.get("data", {}).get("type") == NodeType.KNOWLEDGE_INDEX.value
|
||||
),
|
||||
None
|
||||
)
|
||||
|
||||
if not chunk_structure:
|
||||
raise ValueError("Chunk structure is required in template")
|
||||
|
||||
return chunk_structure
|
||||
|
||||
def _update_existing_template(self, template_id: str, template_data: dict) -> None:
|
||||
"""Update an existing pipeline template"""
|
||||
pipeline_built_in_template = (
|
||||
db.session.query(PipelineBuiltInTemplate)
|
||||
.filter(PipelineBuiltInTemplate.id == template_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not pipeline_built_in_template:
|
||||
raise ValueError(f"Pipeline built-in template not found: {template_id}")
|
||||
|
||||
# Update template fields
|
||||
for key, value in template_data.items():
|
||||
setattr(pipeline_built_in_template, key, value)
|
||||
|
||||
db.session.add(pipeline_built_in_template)
|
||||
|
||||
def _create_new_template(self, template_data: dict) -> None:
|
||||
"""Create a new pipeline template"""
|
||||
# Get the next available position
|
||||
position = self._get_next_position(template_data["language"])
|
||||
|
||||
# Add additional fields for new template
|
||||
template_data.update({
|
||||
"position": position,
|
||||
"install_count": 0,
|
||||
"copyright": dify_config.KNOWLEDGE_PIPELINE_TEMPLATE_COPYRIGHT,
|
||||
"privacy_policy": dify_config.KNOWLEDGE_PIPELINE_TEMPLATE_PRIVACY_POLICY,
|
||||
})
|
||||
|
||||
new_template = PipelineBuiltInTemplate(**template_data)
|
||||
db.session.add(new_template)
|
||||
|
||||
def _get_next_position(self, language: str) -> int:
|
||||
"""Get the next available position for a template in the specified language"""
|
||||
max_position = (
|
||||
db.session.query(func.max(PipelineBuiltInTemplate.position))
|
||||
.filter(PipelineBuiltInTemplate.language == language)
|
||||
.scalar()
|
||||
)
|
||||
return (max_position or 0) + 1
|
||||
|
||||
@ -167,6 +167,7 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
|
||||
WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
|
||||
# App configuration
|
||||
|
||||
@ -40,6 +40,8 @@ def test_dify_config(monkeypatch: pytest.MonkeyPatch):
|
||||
# annotated field with configured value
|
||||
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30
|
||||
|
||||
assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3
|
||||
|
||||
# values from pyproject.toml
|
||||
assert Version(config.project.version) >= Version("1.0.0")
|
||||
|
||||
|
||||
@ -329,20 +329,20 @@ class TestAliyunConfig:
|
||||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
|
||||
def test_endpoint_validation_with_path(self):
|
||||
"""Test endpoint validation preserves path for Aliyun endpoints"""
|
||||
"""Test endpoint validation normalizes URL by removing path"""
|
||||
config = AliyunConfig(
|
||||
license_key="test_license", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
)
|
||||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
|
||||
def test_endpoint_validation_invalid_scheme(self):
|
||||
"""Test endpoint validation rejects invalid schemes"""
|
||||
with pytest.raises(ValidationError, match="URL must start with https:// or http://"):
|
||||
with pytest.raises(ValidationError, match="URL scheme must be one of"):
|
||||
AliyunConfig(license_key="test_license", endpoint="ftp://invalid.tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_endpoint_validation_no_scheme(self):
|
||||
"""Test endpoint validation rejects URLs without scheme"""
|
||||
with pytest.raises(ValidationError, match="URL must start with https:// or http://"):
|
||||
with pytest.raises(ValidationError, match="URL scheme must be one of"):
|
||||
AliyunConfig(license_key="test_license", endpoint="invalid.tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_license_key_required(self):
|
||||
@ -350,23 +350,6 @@ class TestAliyunConfig:
|
||||
with pytest.raises(ValidationError):
|
||||
AliyunConfig(license_key="", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
def test_valid_endpoint_format_examples(self):
|
||||
"""Test valid endpoint format examples from comments"""
|
||||
valid_endpoints = [
|
||||
# cms2.0 public endpoint
|
||||
"https://proj-xtrace-123456-cn-heyuan.cn-heyuan.log.aliyuncs.com/apm/trace/opentelemetry",
|
||||
# cms2.0 intranet endpoint
|
||||
"https://proj-xtrace-123456-cn-heyuan.cn-heyuan-intranet.log.aliyuncs.com/apm/trace/opentelemetry",
|
||||
# xtrace public endpoint
|
||||
"http://tracing-cn-heyuan.arms.aliyuncs.com",
|
||||
# xtrace intranet endpoint
|
||||
"http://tracing-cn-heyuan-internal.arms.aliyuncs.com",
|
||||
]
|
||||
|
||||
for endpoint in valid_endpoints:
|
||||
config = AliyunConfig(license_key="test_license", endpoint=endpoint)
|
||||
assert config.endpoint == endpoint
|
||||
|
||||
|
||||
class TestConfigIntegration:
|
||||
"""Integration tests for configuration classes"""
|
||||
@ -399,7 +382,7 @@ class TestConfigIntegration:
|
||||
assert arize_config.endpoint == "https://arize.com"
|
||||
assert phoenix_with_path_config.endpoint == "https://app.phoenix.arize.com/s/dify-integration"
|
||||
assert phoenix_without_path_config.endpoint == "https://app.phoenix.arize.com"
|
||||
assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces"
|
||||
assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com"
|
||||
|
||||
def test_project_default_values(self):
|
||||
"""Test that project default values are set correctly"""
|
||||
|
||||
@ -1,115 +0,0 @@
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from factories.file_factory import _get_remote_file_info
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, status_code: int, headers: dict[str, str]):
|
||||
self.status_code = status_code
|
||||
self.headers = headers
|
||||
|
||||
|
||||
def _mock_head(monkeypatch: pytest.MonkeyPatch, headers: dict[str, str], status_code: int = 200):
|
||||
def _fake_head(url: str, follow_redirects: bool = True):
|
||||
return _FakeResponse(status_code=status_code, headers=headers)
|
||||
|
||||
monkeypatch.setattr("factories.file_factory.ssrf_proxy.head", _fake_head)
|
||||
|
||||
|
||||
class TestGetRemoteFileInfo:
|
||||
"""Tests for _get_remote_file_info focusing on filename extraction rules."""
|
||||
|
||||
def test_inline_no_filename(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": "inline",
|
||||
"Content-Type": "application/pdf",
|
||||
"Content-Length": "123",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/some/path/file.pdf")
|
||||
assert filename == "file.pdf"
|
||||
assert mime_type == "application/pdf"
|
||||
assert size == 123
|
||||
|
||||
def test_attachment_no_filename(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": "attachment",
|
||||
"Content-Type": "application/octet-stream",
|
||||
"Content-Length": "456",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/downloads/data.bin")
|
||||
assert filename == "data.bin"
|
||||
assert mime_type == "application/octet-stream"
|
||||
assert size == 456
|
||||
|
||||
def test_attachment_quoted_space_filename(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": 'attachment; filename="file name.jpg"',
|
||||
"Content-Type": "image/jpeg",
|
||||
"Content-Length": "789",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "file name.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
assert size == 789
|
||||
|
||||
def test_attachment_filename_star_percent20(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": "attachment; filename*=UTF-8''file%20name.jpg",
|
||||
"Content-Type": "image/jpeg",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "file name.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
|
||||
def test_attachment_filename_star_chinese(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": "attachment; filename*=UTF-8''%E6%B5%8B%E8%AF%95%E6%96%87%E4%BB%B6.jpg",
|
||||
"Content-Type": "image/jpeg",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "测试文件.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
|
||||
def test_filename_from_url_when_no_header(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
# No Content-Disposition
|
||||
"Content-Type": "text/plain",
|
||||
"Content-Length": "12",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/static/file.txt")
|
||||
assert filename == "file.txt"
|
||||
assert mime_type == "text/plain"
|
||||
assert size == 12
|
||||
|
||||
def test_no_filename_in_url_or_header_generates_uuid_bin(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
monkeypatch,
|
||||
{
|
||||
"Content-Disposition": "inline",
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/test/")
|
||||
# Should generate a random hex filename with .bin extension
|
||||
assert re.match(r"^[0-9a-f]{32}\.bin$", filename) is not None
|
||||
assert mime_type == "application/octet-stream"
|
||||
@ -859,10 +859,6 @@ OWNER_TRANSFER_TOKEN_EXPIRY_MINUTES=5
|
||||
# The sandbox service endpoint.
|
||||
CODE_EXECUTION_ENDPOINT=http://sandbox:8194
|
||||
CODE_EXECUTION_API_KEY=dify-sandbox
|
||||
CODE_EXECUTION_SSL_VERIFY=True
|
||||
CODE_EXECUTION_POOL_MAX_CONNECTIONS=100
|
||||
CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS=20
|
||||
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY=5.0
|
||||
CODE_MAX_NUMBER=9223372036854775807
|
||||
CODE_MIN_NUMBER=-9223372036854775808
|
||||
CODE_MAX_DEPTH=5
|
||||
@ -881,6 +877,7 @@ WORKFLOW_MAX_EXECUTION_STEPS=500
|
||||
WORKFLOW_MAX_EXECUTION_TIME=1200
|
||||
WORKFLOW_CALL_MAX_DEPTH=5
|
||||
MAX_VARIABLE_SIZE=204800
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
|
||||
WORKFLOW_FILE_UPLOAD_LIMIT=10
|
||||
|
||||
# GraphEngine Worker Pool Configuration
|
||||
@ -1137,9 +1134,6 @@ SSRF_DEFAULT_TIME_OUT=5
|
||||
SSRF_DEFAULT_CONNECT_TIME_OUT=5
|
||||
SSRF_DEFAULT_READ_TIME_OUT=5
|
||||
SSRF_DEFAULT_WRITE_TIME_OUT=5
|
||||
SSRF_POOL_MAX_CONNECTIONS=100
|
||||
SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS=20
|
||||
SSRF_POOL_KEEPALIVE_EXPIRY=5.0
|
||||
|
||||
# ------------------------------
|
||||
# docker env var for specifying vector db type at startup
|
||||
|
||||
@ -382,10 +382,6 @@ x-shared-env: &shared-api-worker-env
|
||||
OWNER_TRANSFER_TOKEN_EXPIRY_MINUTES: ${OWNER_TRANSFER_TOKEN_EXPIRY_MINUTES:-5}
|
||||
CODE_EXECUTION_ENDPOINT: ${CODE_EXECUTION_ENDPOINT:-http://sandbox:8194}
|
||||
CODE_EXECUTION_API_KEY: ${CODE_EXECUTION_API_KEY:-dify-sandbox}
|
||||
CODE_EXECUTION_SSL_VERIFY: ${CODE_EXECUTION_SSL_VERIFY:-True}
|
||||
CODE_EXECUTION_POOL_MAX_CONNECTIONS: ${CODE_EXECUTION_POOL_MAX_CONNECTIONS:-100}
|
||||
CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS: ${CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS:-20}
|
||||
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY: ${CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY:-5.0}
|
||||
CODE_MAX_NUMBER: ${CODE_MAX_NUMBER:-9223372036854775807}
|
||||
CODE_MIN_NUMBER: ${CODE_MIN_NUMBER:--9223372036854775808}
|
||||
CODE_MAX_DEPTH: ${CODE_MAX_DEPTH:-5}
|
||||
@ -402,6 +398,7 @@ x-shared-env: &shared-api-worker-env
|
||||
WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200}
|
||||
WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5}
|
||||
MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
|
||||
WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
|
||||
WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
|
||||
GRAPH_ENGINE_MIN_WORKERS: ${GRAPH_ENGINE_MIN_WORKERS:-1}
|
||||
GRAPH_ENGINE_MAX_WORKERS: ${GRAPH_ENGINE_MAX_WORKERS:-10}
|
||||
@ -500,9 +497,6 @@ x-shared-env: &shared-api-worker-env
|
||||
SSRF_DEFAULT_CONNECT_TIME_OUT: ${SSRF_DEFAULT_CONNECT_TIME_OUT:-5}
|
||||
SSRF_DEFAULT_READ_TIME_OUT: ${SSRF_DEFAULT_READ_TIME_OUT:-5}
|
||||
SSRF_DEFAULT_WRITE_TIME_OUT: ${SSRF_DEFAULT_WRITE_TIME_OUT:-5}
|
||||
SSRF_POOL_MAX_CONNECTIONS: ${SSRF_POOL_MAX_CONNECTIONS:-100}
|
||||
SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS: ${SSRF_POOL_MAX_KEEPALIVE_CONNECTIONS:-20}
|
||||
SSRF_POOL_KEEPALIVE_EXPIRY: ${SSRF_POOL_KEEPALIVE_EXPIRY:-5.0}
|
||||
EXPOSE_NGINX_PORT: ${EXPOSE_NGINX_PORT:-80}
|
||||
EXPOSE_NGINX_SSL_PORT: ${EXPOSE_NGINX_SSL_PORT:-443}
|
||||
POSITION_TOOL_PINS: ${POSITION_TOOL_PINS:-}
|
||||
|
||||
@ -108,8 +108,7 @@ const Configuration: FC = () => {
|
||||
const [hasFetchedDetail, setHasFetchedDetail] = useState(false)
|
||||
const isLoading = !hasFetchedDetail
|
||||
const pathname = usePathname()
|
||||
const appIdRegex = /\/app\/([^/]+)/
|
||||
const matched = appIdRegex.exec(pathname)
|
||||
const matched = pathname.match(/\/app\/([^/]+)/)
|
||||
const appId = (matched?.length && matched[1]) ? matched[1] : ''
|
||||
const [mode, setMode] = useState('')
|
||||
const [publishedConfig, setPublishedConfig] = useState<PublishConfig | null>(null)
|
||||
|
||||
@ -12,15 +12,9 @@ import { checkKeys } from '@/utils/var'
|
||||
const regex = /\{\{([^}]+)\}\}/g
|
||||
|
||||
export const getInputKeys = (value: string) => {
|
||||
const matches: string[] = []
|
||||
let match
|
||||
regex.lastIndex = 0
|
||||
while ((match = regex.exec(value)) !== null)
|
||||
matches.push(match[0])
|
||||
|
||||
const keys = matches.map((item) => {
|
||||
const keys = value.match(regex)?.map((item) => {
|
||||
return item.replace('{{', '').replace('}}', '')
|
||||
})
|
||||
}) || []
|
||||
const keyObj: Record<string, boolean> = {}
|
||||
// remove duplicate keys
|
||||
const res: string[] = []
|
||||
@ -75,8 +69,7 @@ const BlockInput: FC<IBlockInputProps> = ({
|
||||
const renderSafeContent = (value: string) => {
|
||||
const parts = value.split(/(\{\{[^}]+\}\}|\n)/g)
|
||||
return parts.map((part, index) => {
|
||||
const variableRegex = /^\{\{([^}]+)\}\}$/
|
||||
const variableMatch = variableRegex.exec(part)
|
||||
const variableMatch = part.match(/^\{\{([^}]+)\}\}$/)
|
||||
if (variableMatch) {
|
||||
return (
|
||||
<VarHighlight
|
||||
|
||||
@ -26,8 +26,7 @@ const AnnotationReply = ({
|
||||
const { t } = useTranslation()
|
||||
const router = useRouter()
|
||||
const pathname = usePathname()
|
||||
const appIdRegex = /\/app\/([^/]+)/
|
||||
const matched = appIdRegex.exec(pathname)
|
||||
const matched = pathname.match(/\/app\/([^/]+)/)
|
||||
const appId = (matched?.length && matched[1]) ? matched[1] : ''
|
||||
const featuresStore = useFeaturesStore()
|
||||
const annotationReply = useFeatures(s => s.features.annotationReply)
|
||||
|
||||
@ -28,8 +28,7 @@ const VoiceParamConfig = ({
|
||||
}: VoiceParamConfigProps) => {
|
||||
const { t } = useTranslation()
|
||||
const pathname = usePathname()
|
||||
const appIdRegex = /\/app\/([^/]+)/
|
||||
const matched = appIdRegex.exec(pathname)
|
||||
const matched = pathname.match(/\/app\/([^/]+)/)
|
||||
const appId = (matched?.length && matched[1]) ? matched[1] : ''
|
||||
const text2speech = useFeatures(state => state.features.text2speech)
|
||||
const featuresStore = useFeaturesStore()
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import {
|
||||
createContext,
|
||||
useContext,
|
||||
useEffect,
|
||||
useRef,
|
||||
} from 'react'
|
||||
import {
|
||||
@ -19,11 +18,13 @@ type Shape = {
|
||||
|
||||
export const createFileStore = (
|
||||
value: FileEntity[] = [],
|
||||
onChange?: (files: FileEntity[]) => void,
|
||||
) => {
|
||||
return create<Shape>(set => ({
|
||||
files: value ? [...value] : [],
|
||||
setFiles: (files) => {
|
||||
set({ files })
|
||||
onChange?.(files)
|
||||
},
|
||||
}))
|
||||
}
|
||||
@ -54,35 +55,9 @@ export const FileContextProvider = ({
|
||||
onChange,
|
||||
}: FileProviderProps) => {
|
||||
const storeRef = useRef<FileStore | undefined>(undefined)
|
||||
const onChangeRef = useRef<FileProviderProps['onChange']>(onChange)
|
||||
const isSyncingRef = useRef(false)
|
||||
|
||||
if (!storeRef.current)
|
||||
storeRef.current = createFileStore(value)
|
||||
|
||||
// keep latest onChange
|
||||
useEffect(() => {
|
||||
onChangeRef.current = onChange
|
||||
}, [onChange])
|
||||
|
||||
// subscribe to store changes and call latest onChange
|
||||
useEffect(() => {
|
||||
const store = storeRef.current!
|
||||
const unsubscribe = store.subscribe((state: Shape) => {
|
||||
if (isSyncingRef.current) return
|
||||
onChangeRef.current?.(state.files)
|
||||
})
|
||||
return unsubscribe
|
||||
}, [])
|
||||
|
||||
// sync external value into internal store when value changes
|
||||
useEffect(() => {
|
||||
const store = storeRef.current!
|
||||
const nextFiles = value ? [...value] : []
|
||||
isSyncingRef.current = true
|
||||
store.setState({ files: nextFiles })
|
||||
isSyncingRef.current = false
|
||||
}, [value])
|
||||
storeRef.current = createFileStore(value, onChange)
|
||||
|
||||
return (
|
||||
<FileContext.Provider value={storeRef.current}>
|
||||
|
||||
@ -11,10 +11,7 @@ export const preprocessLaTeX = (content: string) => {
|
||||
return content
|
||||
|
||||
const codeBlockRegex = /```[\s\S]*?```/g
|
||||
const codeBlocks: string[] = []
|
||||
let match
|
||||
while ((match = codeBlockRegex.exec(content)) !== null)
|
||||
codeBlocks.push(match[0])
|
||||
const codeBlocks = content.match(codeBlockRegex) || []
|
||||
const escapeReplacement = (str: string) => str.replace(/\$/g, '_TMP_REPLACE_DOLLAR_')
|
||||
let processedContent = content.replace(codeBlockRegex, 'CODE_BLOCK_PLACEHOLDER')
|
||||
|
||||
|
||||
@ -236,8 +236,7 @@ const Flowchart = (props: FlowchartProps) => {
|
||||
.split('\n')
|
||||
.map((line) => {
|
||||
// Gantt charts have specific syntax needs.
|
||||
const taskRegex = /^\s*([^:]+?)\s*:\s*(.*)/
|
||||
const taskMatch = taskRegex.exec(line)
|
||||
const taskMatch = line.match(/^\s*([^:]+?)\s*:\s*(.*)/)
|
||||
if (!taskMatch)
|
||||
return line // Not a task line, return as is.
|
||||
|
||||
@ -246,12 +245,7 @@ const Flowchart = (props: FlowchartProps) => {
|
||||
|
||||
// Rule 1: Correct multiple "after" dependencies ONLY if they exist.
|
||||
// This is a common mistake, e.g., "..., after task1, after task2, ..."
|
||||
const afterMatches: string[] = []
|
||||
const afterRegex = /after /g
|
||||
let afterMatch
|
||||
while ((afterMatch = afterRegex.exec(paramsStr)) !== null)
|
||||
afterMatches.push(afterMatch[0])
|
||||
const afterCount = afterMatches.length
|
||||
const afterCount = (paramsStr.match(/after /g) || []).length
|
||||
if (afterCount > 1)
|
||||
paramsStr = paramsStr.replace(/,\s*after\s+/g, ' ')
|
||||
|
||||
|
||||
@ -167,11 +167,10 @@ export function isMermaidCodeComplete(code: string): boolean {
|
||||
const isBalanced = true
|
||||
|
||||
// Check for common syntax errors
|
||||
const arrowRegex = /\S+\s*-->\s*\S+/
|
||||
const hasNoSyntaxErrors = !trimmedCode.includes('undefined')
|
||||
&& !trimmedCode.includes('[object Object]')
|
||||
&& trimmedCode.split('\n').every(line =>
|
||||
!(line.includes('-->') && !arrowRegex.test(line)))
|
||||
!(line.includes('-->') && !line.match(/\S+\s*-->\s*\S+/)))
|
||||
|
||||
return hasValidStart && isBalanced && hasNoSyntaxErrors
|
||||
}
|
||||
|
||||
@ -37,15 +37,10 @@ export const getInputVars = (text: string): ValueSelector[] => {
|
||||
if (!text || typeof text !== 'string')
|
||||
return []
|
||||
|
||||
const matches: string[] = []
|
||||
const regex = /{{#([^#]*)#}}/g
|
||||
let match
|
||||
while ((match = regex.exec(text)) !== null)
|
||||
matches.push(match[0])
|
||||
|
||||
if (matches && matches?.length > 0) {
|
||||
const allVars = text.match(/{{#([^#]*)#}}/g)
|
||||
if (allVars && allVars?.length > 0) {
|
||||
// {{#context#}}, {{#query#}} is not input vars
|
||||
const inputVars = matches
|
||||
const inputVars = allVars
|
||||
.filter(item => item.includes('.'))
|
||||
.map((item) => {
|
||||
const valueSelector = item.replace('{{#', '').replace('#}}', '').split('.')
|
||||
|
||||
@ -53,8 +53,7 @@ export const pluginManifestInMarketToPluginProps = (pluginManifest: PluginManife
|
||||
}
|
||||
|
||||
export const parseGitHubUrl = (url: string): GitHubUrlInfo => {
|
||||
const regex = /^https:\/\/github\.com\/([^/]+)\/([^/]+)\/?$/
|
||||
const match = regex.exec(url)
|
||||
const match = url.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/?$/)
|
||||
return match ? { isValid: true, owner: match[1], repo: match[2] } : { isValid: false }
|
||||
}
|
||||
|
||||
|
||||
@ -14,6 +14,16 @@ export const usePipelineConfig = () => {
|
||||
const pipelineId = useStore(s => s.pipelineId)
|
||||
const workflowStore = useWorkflowStore()
|
||||
|
||||
const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
|
||||
const { setWorkflowConfig } = workflowStore.getState()
|
||||
|
||||
setWorkflowConfig(config)
|
||||
}, [workflowStore])
|
||||
useWorkflowConfig(
|
||||
pipelineId ? `/rag/pipelines/${pipelineId}/workflows/draft/config` : '',
|
||||
handleUpdateWorkflowConfig,
|
||||
)
|
||||
|
||||
const handleUpdateNodesDefaultConfigs = useCallback((nodesDefaultConfigs: Record<string, any> | Record<string, any>[]) => {
|
||||
const { setNodesDefaultConfigs } = workflowStore.getState()
|
||||
let res: Record<string, any> = {}
|
||||
|
||||
@ -37,8 +37,7 @@ export type DuplicateAppModalProps = {
|
||||
|
||||
const DEFAULT_ICON = { type: 'emoji', icon: '🧿', background: '#EFF1F5' }
|
||||
const extractFileId = (url: string) => {
|
||||
const regex = /files\/(.+?)\/file-preview/
|
||||
const match = regex.exec(url)
|
||||
const match = url.match(/files\/(.+?)\/file-preview/)
|
||||
return match ? match[1] : null
|
||||
}
|
||||
const getIcon = (data?: ToolWithProvider) => {
|
||||
|
||||
@ -33,6 +33,13 @@ export const useWorkflowInit = () => {
|
||||
workflowStore.setState({ appId: appDetail.id, appName: appDetail.name })
|
||||
}, [appDetail.id, workflowStore])
|
||||
|
||||
const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
|
||||
const { setWorkflowConfig } = workflowStore.getState()
|
||||
|
||||
setWorkflowConfig(config)
|
||||
}, [workflowStore])
|
||||
useWorkflowConfig(`/apps/${appDetail.id}/workflows/draft/config`, handleUpdateWorkflowConfig)
|
||||
|
||||
const handleUpdateWorkflowFileUploadConfig = useCallback((config: FileUploadConfigResponse) => {
|
||||
const { setFileUploadConfig } = workflowStore.getState()
|
||||
setFileUploadConfig(config)
|
||||
|
||||
@ -35,6 +35,8 @@ export const NODE_LAYOUT_HORIZONTAL_PADDING = 60
|
||||
export const NODE_LAYOUT_VERTICAL_PADDING = 60
|
||||
export const NODE_LAYOUT_MIN_DISTANCE = 100
|
||||
|
||||
export const PARALLEL_DEPTH_LIMIT = 3
|
||||
|
||||
export const RETRIEVAL_OUTPUT_STRUCT = `{
|
||||
"content": "",
|
||||
"title": "",
|
||||
|
||||
@ -70,7 +70,7 @@ export const useNodesInteractions = () => {
|
||||
const reactflow = useReactFlow()
|
||||
const { store: workflowHistoryStore } = useWorkflowHistoryStore()
|
||||
const { handleSyncWorkflowDraft } = useNodesSyncDraft()
|
||||
const { getAfterNodesInSameBranch } = useWorkflow()
|
||||
const { checkNestedParallelLimit, getAfterNodesInSameBranch } = useWorkflow()
|
||||
const { getNodesReadOnly } = useNodesReadOnly()
|
||||
const { getWorkflowReadOnly } = useWorkflowReadOnly()
|
||||
const { handleSetHelpline } = useHelpline()
|
||||
@ -436,13 +436,21 @@ export const useNodesInteractions = () => {
|
||||
draft.push(newEdge)
|
||||
})
|
||||
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, targetNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
|
||||
handleSyncWorkflowDraft()
|
||||
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
|
||||
nodeId: targetNode?.id,
|
||||
})
|
||||
handleSyncWorkflowDraft()
|
||||
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
|
||||
nodeId: targetNode?.id,
|
||||
})
|
||||
}
|
||||
else {
|
||||
const { setConnectingNodePayload, setEnteringNodePayload }
|
||||
= workflowStore.getState()
|
||||
setConnectingNodePayload(undefined)
|
||||
setEnteringNodePayload(undefined)
|
||||
}
|
||||
},
|
||||
[
|
||||
getNodesReadOnly,
|
||||
@ -450,6 +458,7 @@ export const useNodesInteractions = () => {
|
||||
workflowStore,
|
||||
handleSyncWorkflowDraft,
|
||||
saveStateToHistory,
|
||||
checkNestedParallelLimit,
|
||||
],
|
||||
)
|
||||
|
||||
@ -925,8 +934,13 @@ export const useNodesInteractions = () => {
|
||||
if (newEdge) draft.push(newEdge)
|
||||
})
|
||||
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, prevNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if (!prevNodeId && nextNodeId) {
|
||||
const nextNodeIndex = nodes.findIndex(node => node.id === nextNodeId)
|
||||
@ -1073,11 +1087,17 @@ export const useNodesInteractions = () => {
|
||||
draft.push(newEdge)
|
||||
})
|
||||
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
if (checkNestedParallelLimit(newNodes, newEdges, nextNode)) {
|
||||
setNodes(newNodes)
|
||||
setEdges(newEdges)
|
||||
}
|
||||
else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
else {
|
||||
setNodes(newNodes)
|
||||
if (checkNestedParallelLimit(newNodes, edges)) setNodes(newNodes)
|
||||
else return false
|
||||
}
|
||||
}
|
||||
if (prevNodeId && nextNodeId) {
|
||||
@ -1277,6 +1297,7 @@ export const useNodesInteractions = () => {
|
||||
saveStateToHistory,
|
||||
workflowStore,
|
||||
getAfterNodesInSameBranch,
|
||||
checkNestedParallelLimit,
|
||||
nodesMetaDataMap,
|
||||
],
|
||||
)
|
||||
|
||||
@ -2,6 +2,7 @@ import {
|
||||
useCallback,
|
||||
} from 'react'
|
||||
import { uniqBy } from 'lodash-es'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import {
|
||||
getIncomers,
|
||||
getOutgoers,
|
||||
@ -23,7 +24,9 @@ import {
|
||||
useStore,
|
||||
useWorkflowStore,
|
||||
} from '../store'
|
||||
import { getParallelInfo } from '../utils'
|
||||
import {
|
||||
PARALLEL_DEPTH_LIMIT,
|
||||
SUPPORT_OUTPUT_VARS_NODE,
|
||||
} from '../constants'
|
||||
import type { IterationNodeType } from '../nodes/iteration/types'
|
||||
@ -41,6 +44,7 @@ import {
|
||||
import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants'
|
||||
import { CUSTOM_LOOP_START_NODE } from '@/app/components/workflow/nodes/loop-start/constants'
|
||||
import { basePath } from '@/utils/var'
|
||||
import { MAX_PARALLEL_LIMIT } from '@/config'
|
||||
import { useNodesMetaData } from '.'
|
||||
|
||||
export const useIsChatMode = () => {
|
||||
@ -50,7 +54,9 @@ export const useIsChatMode = () => {
|
||||
}
|
||||
|
||||
export const useWorkflow = () => {
|
||||
const { t } = useTranslation()
|
||||
const store = useStoreApi()
|
||||
const workflowStore = useWorkflowStore()
|
||||
const { getAvailableBlocks } = useAvailableBlocks()
|
||||
const { nodesMap } = useNodesMetaData()
|
||||
|
||||
@ -284,6 +290,20 @@ export const useWorkflow = () => {
|
||||
return isUsed
|
||||
}, [isVarUsedInNodes])
|
||||
|
||||
const checkParallelLimit = useCallback((nodeId: string, nodeHandle = 'source') => {
|
||||
const {
|
||||
edges,
|
||||
} = store.getState()
|
||||
const connectedEdges = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === nodeHandle)
|
||||
if (connectedEdges.length > MAX_PARALLEL_LIMIT - 1) {
|
||||
const { setShowTips } = workflowStore.getState()
|
||||
setShowTips(t('workflow.common.parallelTip.limit', { num: MAX_PARALLEL_LIMIT }))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}, [store, workflowStore, t])
|
||||
|
||||
const getRootNodesById = useCallback((nodeId: string) => {
|
||||
const {
|
||||
getNodes,
|
||||
@ -354,6 +374,33 @@ export const useWorkflow = () => {
|
||||
return startNodes
|
||||
}, [nodesMap, getRootNodesById])
|
||||
|
||||
const checkNestedParallelLimit = useCallback((nodes: Node[], edges: Edge[], targetNode?: Node) => {
|
||||
const startNodes = getStartNodes(nodes, targetNode)
|
||||
|
||||
for (let i = 0; i < startNodes.length; i++) {
|
||||
const {
|
||||
parallelList,
|
||||
hasAbnormalEdges,
|
||||
} = getParallelInfo(startNodes[i], nodes, edges)
|
||||
const { workflowConfig } = workflowStore.getState()
|
||||
|
||||
if (hasAbnormalEdges)
|
||||
return false
|
||||
|
||||
for (let i = 0; i < parallelList.length; i++) {
|
||||
const parallel = parallelList[i]
|
||||
|
||||
if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) {
|
||||
const { setShowTips } = workflowStore.getState()
|
||||
setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) }))
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}, [t, workflowStore, getStartNodes])
|
||||
|
||||
const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => {
|
||||
const {
|
||||
edges,
|
||||
@ -363,6 +410,9 @@ export const useWorkflow = () => {
|
||||
const sourceNode: Node = nodes.find(node => node.id === source)!
|
||||
const targetNode: Node = nodes.find(node => node.id === target)!
|
||||
|
||||
if (!checkParallelLimit(source!, sourceHandle || 'source'))
|
||||
return false
|
||||
|
||||
if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE)
|
||||
return false
|
||||
|
||||
@ -395,7 +445,7 @@ export const useWorkflow = () => {
|
||||
}
|
||||
|
||||
return !hasCycle(targetNode)
|
||||
}, [store, getAvailableBlocks])
|
||||
}, [store, checkParallelLimit, getAvailableBlocks])
|
||||
|
||||
return {
|
||||
getNodeById,
|
||||
@ -407,6 +457,8 @@ export const useWorkflow = () => {
|
||||
isVarUsedInNodes,
|
||||
removeUsedVarInNodes,
|
||||
isNodeVarsUsedInNodes,
|
||||
checkParallelLimit,
|
||||
checkNestedParallelLimit,
|
||||
isValidConnection,
|
||||
getBeforeNodeById,
|
||||
getIterationNodeChildren,
|
||||
|
||||
@ -71,6 +71,7 @@ import PanelContextmenu from './panel-contextmenu'
|
||||
import NodeContextmenu from './node-contextmenu'
|
||||
import SelectionContextmenu from './selection-contextmenu'
|
||||
import SyncingDataModal from './syncing-data-modal'
|
||||
import LimitTips from './limit-tips'
|
||||
import { setupScrollToNodeListener } from './utils/node-navigation'
|
||||
import {
|
||||
useStore,
|
||||
@ -377,6 +378,7 @@ export const Workflow: FC<WorkflowProps> = memo(({
|
||||
/>
|
||||
)
|
||||
}
|
||||
<LimitTips />
|
||||
{children}
|
||||
<ReactFlow
|
||||
nodeTypes={nodeTypes}
|
||||
|
||||
39
web/app/components/workflow/limit-tips.tsx
Normal file
39
web/app/components/workflow/limit-tips.tsx
Normal file
@ -0,0 +1,39 @@
|
||||
import {
|
||||
RiAlertFill,
|
||||
RiCloseLine,
|
||||
} from '@remixicon/react'
|
||||
import { useStore } from './store'
|
||||
import ActionButton from '@/app/components/base/action-button'
|
||||
|
||||
const LimitTips = () => {
|
||||
const showTips = useStore(s => s.showTips)
|
||||
const setShowTips = useStore(s => s.setShowTips)
|
||||
|
||||
if (!showTips)
|
||||
return null
|
||||
|
||||
return (
|
||||
<div className='absolute bottom-16 left-1/2 z-[9] flex h-10 -translate-x-1/2 items-center rounded-xl border border-components-panel-border bg-components-panel-bg-blur p-2 shadow-md'>
|
||||
<div
|
||||
className='absolute inset-0 rounded-xl opacity-[0.4]'
|
||||
style={{
|
||||
background: 'linear-gradient(92deg, rgba(247, 144, 9, 0.25) 0%, rgba(255, 255, 255, 0.00) 100%)',
|
||||
}}
|
||||
></div>
|
||||
<div className='flex h-5 w-5 items-center justify-center'>
|
||||
<RiAlertFill className='h-4 w-4 text-text-warning-secondary' />
|
||||
</div>
|
||||
<div className='system-xs-medium mx-1 px-1 text-text-primary'>
|
||||
{showTips}
|
||||
</div>
|
||||
<ActionButton
|
||||
className='z-[1]'
|
||||
onClick={() => setShowTips('')}
|
||||
>
|
||||
<RiCloseLine className='h-4 w-4' />
|
||||
</ActionButton>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export default LimitTips
|
||||
@ -84,8 +84,7 @@ const CodeEditor: FC<Props> = ({
|
||||
|
||||
const getUniqVarName = (varName: string) => {
|
||||
if (varList.find(v => v.variable === varName)) {
|
||||
const regex = /_(\d+)$/
|
||||
const match = regex.exec(varName)
|
||||
const match = varName.match(/_(\d+)$/)
|
||||
|
||||
const index = (() => {
|
||||
if (match)
|
||||
|
||||
@ -12,6 +12,7 @@ import {
|
||||
useAvailableBlocks,
|
||||
useNodesInteractions,
|
||||
useNodesReadOnly,
|
||||
useWorkflow,
|
||||
} from '@/app/components/workflow/hooks'
|
||||
import BlockSelector from '@/app/components/workflow/block-selector'
|
||||
import type {
|
||||
@ -38,6 +39,7 @@ const Add = ({
|
||||
const { handleNodeAdd } = useNodesInteractions()
|
||||
const { nodesReadOnly } = useNodesReadOnly()
|
||||
const { availableNextBlocks } = useAvailableBlocks(nodeData.type, nodeData.isInIteration || nodeData.isInLoop)
|
||||
const { checkParallelLimit } = useWorkflow()
|
||||
|
||||
const handleSelect = useCallback<OnSelectBlock>((type, toolDefaultValue) => {
|
||||
handleNodeAdd(
|
||||
@ -50,11 +52,14 @@ const Add = ({
|
||||
prevNodeSourceHandle: sourceHandle,
|
||||
},
|
||||
)
|
||||
}, [handleNodeAdd])
|
||||
}, [nodeId, sourceHandle, handleNodeAdd])
|
||||
|
||||
const handleOpenChange = useCallback((newOpen: boolean) => {
|
||||
if (newOpen && !checkParallelLimit(nodeId, sourceHandle))
|
||||
return
|
||||
|
||||
setOpen(newOpen)
|
||||
}, [])
|
||||
}, [checkParallelLimit, nodeId, sourceHandle])
|
||||
|
||||
const tip = useMemo(() => {
|
||||
if (isFailBranch)
|
||||
|
||||
@ -22,6 +22,7 @@ import {
|
||||
useIsChatMode,
|
||||
useNodesInteractions,
|
||||
useNodesReadOnly,
|
||||
useWorkflow,
|
||||
} from '../../../hooks'
|
||||
import {
|
||||
useStore,
|
||||
@ -131,6 +132,7 @@ export const NodeSourceHandle = memo(({
|
||||
const { availableNextBlocks } = useAvailableBlocks(data.type, data.isInIteration || data.isInLoop)
|
||||
const isConnectable = !!availableNextBlocks.length
|
||||
const isChatMode = useIsChatMode()
|
||||
const { checkParallelLimit } = useWorkflow()
|
||||
|
||||
const connected = data._connectedSourceHandleIds?.includes(handleId)
|
||||
const handleOpenChange = useCallback((v: boolean) => {
|
||||
@ -138,8 +140,9 @@ export const NodeSourceHandle = memo(({
|
||||
}, [])
|
||||
const handleHandleClick = useCallback((e: MouseEvent) => {
|
||||
e.stopPropagation()
|
||||
setOpen(v => !v)
|
||||
}, [])
|
||||
if (checkParallelLimit(id, handleId))
|
||||
setOpen(v => !v)
|
||||
}, [checkParallelLimit, id, handleId])
|
||||
const handleSelect = useCallback((type: BlockEnum, toolDefaultValue?: ToolDefaultValue) => {
|
||||
handleNodeAdd(
|
||||
{
|
||||
|
||||
@ -25,8 +25,7 @@ const SupportVarInput: FC<Props> = ({
|
||||
const renderSafeContent = (inputValue: string) => {
|
||||
const parts = inputValue.split(/(\{\{[^}]+\}\}|\n)/g)
|
||||
return parts.map((part, index) => {
|
||||
const variableRegex = /^\{\{([^}]+)\}\}$/
|
||||
const variableMatch = variableRegex.exec(part)
|
||||
const variableMatch = part.match(/^\{\{([^}]+)\}\}$/)
|
||||
if (variableMatch) {
|
||||
return (
|
||||
<VarHighlight
|
||||
|
||||
@ -42,7 +42,6 @@ import type { RAGPipelineVariable } from '@/models/pipeline'
|
||||
|
||||
import {
|
||||
AGENT_OUTPUT_STRUCT,
|
||||
FILE_STRUCT,
|
||||
HTTP_REQUEST_OUTPUT_STRUCT,
|
||||
KNOWLEDGE_RETRIEVAL_OUTPUT_STRUCT,
|
||||
LLM_OUTPUT_STRUCT,
|
||||
@ -139,10 +138,6 @@ export const varTypeToStructType = (type: VarType): Type => {
|
||||
[VarType.boolean]: Type.boolean,
|
||||
[VarType.object]: Type.object,
|
||||
[VarType.array]: Type.array,
|
||||
[VarType.arrayString]: Type.array,
|
||||
[VarType.arrayNumber]: Type.array,
|
||||
[VarType.arrayObject]: Type.array,
|
||||
[VarType.arrayFile]: Type.array,
|
||||
} as any
|
||||
)[type] || Type.string
|
||||
)
|
||||
@ -287,6 +282,15 @@ const findExceptVarInObject = (
|
||||
children: filteredObj.children,
|
||||
}
|
||||
})
|
||||
|
||||
if (isFile && Array.isArray(childrenResult)) {
|
||||
if (childrenResult.length === 0) {
|
||||
childrenResult = OUTPUT_FILE_SUB_VARIABLES.map(key => ({
|
||||
variable: key,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
childrenResult = []
|
||||
@ -582,15 +586,17 @@ const formatItem = (
|
||||
variable: outputKey,
|
||||
type:
|
||||
output.type === 'array'
|
||||
? (`Array[${output.items?.type
|
||||
? output.items.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.items.type.slice(1)
|
||||
: 'Unknown'
|
||||
? (`Array[${
|
||||
output.items?.type
|
||||
? output.items.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.items.type.slice(1)
|
||||
: 'Unknown'
|
||||
}]` as VarType)
|
||||
: (`${output.type
|
||||
? output.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.type.slice(1)
|
||||
: 'Unknown'
|
||||
: (`${
|
||||
output.type
|
||||
? output.type.slice(0, 1).toLocaleUpperCase()
|
||||
+ output.type.slice(1)
|
||||
: 'Unknown'
|
||||
}` as VarType),
|
||||
})
|
||||
},
|
||||
@ -684,10 +690,9 @@ const formatItem = (
|
||||
const children = (() => {
|
||||
if (isFile) {
|
||||
return OUTPUT_FILE_SUB_VARIABLES.map((key) => {
|
||||
const def = FILE_STRUCT.find(c => c.variable === key)
|
||||
return {
|
||||
variable: key,
|
||||
type: def?.type || VarType.string,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -709,10 +714,9 @@ const formatItem = (
|
||||
if (isFile) {
|
||||
return {
|
||||
children: OUTPUT_FILE_SUB_VARIABLES.map((key) => {
|
||||
const def = FILE_STRUCT.find(c => c.variable === key)
|
||||
return {
|
||||
variable: key,
|
||||
type: def?.type || VarType.string,
|
||||
type: key === 'size' ? VarType.number : VarType.string,
|
||||
}
|
||||
}),
|
||||
}
|
||||
@ -1202,10 +1206,7 @@ const matchNotSystemVars = (prompts: string[]) => {
|
||||
prompts.forEach((prompt) => {
|
||||
VAR_REGEX.lastIndex = 0
|
||||
if (typeof prompt !== 'string') return
|
||||
|
||||
let match
|
||||
while ((match = VAR_REGEX.exec(prompt)) !== null)
|
||||
allVars.push(match[0])
|
||||
allVars.push(...(prompt.match(VAR_REGEX) || []))
|
||||
})
|
||||
const uniqVars = uniq(allVars).map(v =>
|
||||
v.replaceAll('{{#', '').replace('#}}', '').split('.'),
|
||||
|
||||
@ -18,6 +18,7 @@ import { Type } from '../../../llm/types'
|
||||
import PickerStructurePanel from '@/app/components/workflow/nodes/_base/components/variable/object-child-tree-panel/picker'
|
||||
import { isSpecialVar, varTypeToStructType } from './utils'
|
||||
import type { Field } from '@/app/components/workflow/nodes/llm/types'
|
||||
import { FILE_STRUCT } from '@/app/components/workflow/constants'
|
||||
import { noop } from 'lodash-es'
|
||||
import { CodeAssistant, MagicEdit } from '@/app/components/base/icons/src/vender/line/general'
|
||||
import ManageInputField from './manage-input-field'
|
||||
@ -105,9 +106,8 @@ const Item: FC<ItemProps> = ({
|
||||
|
||||
const objStructuredOutput: StructuredOutput | null = useMemo(() => {
|
||||
if (!isObj) return null
|
||||
const properties: Record<string, Field> = {}
|
||||
const childrenVars = (itemData.children as Var[]) || []
|
||||
childrenVars.forEach((c) => {
|
||||
const properties: Record<string, Field> = {};
|
||||
(isFile ? FILE_STRUCT : (itemData.children as Var[])).forEach((c) => {
|
||||
properties[c.variable] = {
|
||||
type: varTypeToStructType(c.type),
|
||||
}
|
||||
@ -120,7 +120,7 @@ const Item: FC<ItemProps> = ({
|
||||
additionalProperties: false,
|
||||
},
|
||||
}
|
||||
}, [isObj, itemData.children])
|
||||
}, [isFile, isObj, itemData.children])
|
||||
|
||||
const structuredOutput = (() => {
|
||||
if (isStructureOutput)
|
||||
@ -448,5 +448,4 @@ const VarReferenceVars: FC<Props> = ({
|
||||
</>
|
||||
)
|
||||
}
|
||||
|
||||
export default React.memo(VarReferenceVars)
|
||||
|
||||
@ -10,7 +10,7 @@ export const extractFunctionParams = (code: string, language: CodeLanguage) => {
|
||||
[CodeLanguage.python3]: /def\s+main\s*\((.*?)\)/,
|
||||
[CodeLanguage.javascript]: /function\s+main\s*\((.*?)\)/,
|
||||
}
|
||||
const match = patterns[language].exec(code)
|
||||
const match = code.match(patterns[language])
|
||||
const params: string[] = []
|
||||
|
||||
if (match?.[1]) {
|
||||
|
||||
@ -29,13 +29,7 @@ const parseCurl = (curlCommand: string): { node: HttpNodeType | null; error: str
|
||||
params: '',
|
||||
body: { type: BodyType.none, data: '' },
|
||||
}
|
||||
const regex = /(?:[^\s"']+|"[^"]*"|'[^']*')+/g
|
||||
const matches: string[] = []
|
||||
let match
|
||||
while ((match = regex.exec(curlCommand)) !== null)
|
||||
matches.push(match[0])
|
||||
|
||||
const args = matches
|
||||
const args = curlCommand.match(/(?:[^\s"']+|"[^"]*"|'[^']*')+/g) || []
|
||||
let hasData = false
|
||||
|
||||
for (let i = 1; i < args.length; i++) {
|
||||
@ -81,8 +75,7 @@ const parseCurl = (curlCommand: string): { node: HttpNodeType | null; error: str
|
||||
|
||||
// To support command like `curl -F "file=@/path/to/file;type=application/zip"`
|
||||
// the `;type=application/zip` should translate to `Content-Type: application/zip`
|
||||
const typeRegex = /^(.+?);type=(.+)$/
|
||||
const typeMatch = typeRegex.exec(value)
|
||||
const typeMatch = value.match(/^(.+?);type=(.+)$/)
|
||||
if (typeMatch) {
|
||||
const [, actualValue, mimeType] = typeMatch
|
||||
value = actualValue
|
||||
|
||||
@ -84,7 +84,7 @@ const ConditionItem = ({
|
||||
) {
|
||||
const regex = isCommonVariable ? COMMON_VARIABLE_REGEX : VARIABLE_REGEX
|
||||
const matchedStartNumber = isCommonVariable ? 2 : 3
|
||||
const matched = regex.exec(condition.value)
|
||||
const matched = condition.value.match(regex)
|
||||
|
||||
if (matched?.length) {
|
||||
return {
|
||||
|
||||
@ -55,7 +55,6 @@ const Panel: FC<NodePanelProps<ListFilterNodeType>> = ({
|
||||
value={inputs.variable || []}
|
||||
onChange={handleVarChanges}
|
||||
filterVar={filterVar}
|
||||
isSupportFileVar={false}
|
||||
typePlaceHolder='Array'
|
||||
/>
|
||||
</Field>
|
||||
|
||||
@ -29,6 +29,10 @@ export type WorkflowSliceShape = {
|
||||
setControlPromptEditorRerenderKey: (controlPromptEditorRerenderKey: number) => void
|
||||
showImportDSLModal: boolean
|
||||
setShowImportDSLModal: (showImportDSLModal: boolean) => void
|
||||
showTips: string
|
||||
setShowTips: (showTips: string) => void
|
||||
workflowConfig?: Record<string, any>
|
||||
setWorkflowConfig: (workflowConfig: Record<string, any>) => void
|
||||
fileUploadConfig?: FileUploadConfigResponse
|
||||
setFileUploadConfig: (fileUploadConfig: FileUploadConfigResponse) => void
|
||||
}
|
||||
@ -55,6 +59,10 @@ export const createWorkflowSlice: StateCreator<WorkflowSliceShape> = set => ({
|
||||
setControlPromptEditorRerenderKey: controlPromptEditorRerenderKey => set(() => ({ controlPromptEditorRerenderKey })),
|
||||
showImportDSLModal: false,
|
||||
setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })),
|
||||
showTips: '',
|
||||
setShowTips: showTips => set(() => ({ showTips })),
|
||||
workflowConfig: undefined,
|
||||
setWorkflowConfig: workflowConfig => set(() => ({ workflowConfig })),
|
||||
fileUploadConfig: undefined,
|
||||
setFileUploadConfig: fileUploadConfig => set(() => ({ fileUploadConfig })),
|
||||
})
|
||||
|
||||
@ -105,7 +105,7 @@ export function getLoopStartNode(loopId: string): Node {
|
||||
|
||||
export const genNewNodeTitleFromOld = (oldTitle: string) => {
|
||||
const regex = /^(.+?)\s*\((\d+)\)\s*$/
|
||||
const match = regex.exec(oldTitle)
|
||||
const match = oldTitle.match(regex)
|
||||
|
||||
if (match) {
|
||||
const title = match[1]
|
||||
|
||||
@ -1,8 +1,12 @@
|
||||
import {
|
||||
getConnectedEdges,
|
||||
getIncomers,
|
||||
getOutgoers,
|
||||
} from 'reactflow'
|
||||
import { v4 as uuid4 } from 'uuid'
|
||||
import {
|
||||
groupBy,
|
||||
isEqual,
|
||||
uniqBy,
|
||||
} from 'lodash-es'
|
||||
import type {
|
||||
@ -164,6 +168,158 @@ export const changeNodesAndEdgesId = (nodes: Node[], edges: Edge[]) => {
|
||||
return [newNodes, newEdges] as [Node[], Edge[]]
|
||||
}
|
||||
|
||||
type ParallelInfoItem = {
|
||||
parallelNodeId: string
|
||||
depth: number
|
||||
isBranch?: boolean
|
||||
}
|
||||
type NodeParallelInfo = {
|
||||
parallelNodeId: string
|
||||
edgeHandleId: string
|
||||
depth: number
|
||||
}
|
||||
type NodeHandle = {
|
||||
node: Node
|
||||
handle: string
|
||||
}
|
||||
type NodeStreamInfo = {
|
||||
upstreamNodes: Set<string>
|
||||
downstreamEdges: Set<string>
|
||||
}
|
||||
export const getParallelInfo = (startNode: Node, nodes: Node[], edges: Edge[]) => {
|
||||
if (!startNode)
|
||||
throw new Error('Start node not found')
|
||||
|
||||
const parallelList = [] as ParallelInfoItem[]
|
||||
const nextNodeHandles = [{ node: startNode, handle: 'source' }]
|
||||
let hasAbnormalEdges = false
|
||||
|
||||
const traverse = (firstNodeHandle: NodeHandle) => {
|
||||
const nodeEdgesSet = {} as Record<string, Set<string>>
|
||||
const totalEdgesSet = new Set<string>()
|
||||
const nextHandles = [firstNodeHandle]
|
||||
const streamInfo = {} as Record<string, NodeStreamInfo>
|
||||
const parallelListItem = {
|
||||
parallelNodeId: '',
|
||||
depth: 0,
|
||||
} as ParallelInfoItem
|
||||
const nodeParallelInfoMap = {} as Record<string, NodeParallelInfo>
|
||||
nodeParallelInfoMap[firstNodeHandle.node.id] = {
|
||||
parallelNodeId: '',
|
||||
edgeHandleId: '',
|
||||
depth: 0,
|
||||
}
|
||||
|
||||
while (nextHandles.length) {
|
||||
const currentNodeHandle = nextHandles.shift()!
|
||||
const { node: currentNode, handle: currentHandle = 'source' } = currentNodeHandle
|
||||
const currentNodeHandleKey = currentNode.id
|
||||
const connectedEdges = edges.filter(edge => edge.source === currentNode.id && edge.sourceHandle === currentHandle)
|
||||
const connectedEdgesLength = connectedEdges.length
|
||||
const outgoers = nodes.filter(node => connectedEdges.some(edge => edge.target === node.id))
|
||||
const incomers = getIncomers(currentNode, nodes, edges)
|
||||
|
||||
if (!streamInfo[currentNodeHandleKey]) {
|
||||
streamInfo[currentNodeHandleKey] = {
|
||||
upstreamNodes: new Set<string>(),
|
||||
downstreamEdges: new Set<string>(),
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeEdgesSet[currentNodeHandleKey]?.size > 0 && incomers.length > 1) {
|
||||
const newSet = new Set<string>()
|
||||
for (const item of totalEdgesSet) {
|
||||
if (!streamInfo[currentNodeHandleKey].downstreamEdges.has(item))
|
||||
newSet.add(item)
|
||||
}
|
||||
if (isEqual(nodeEdgesSet[currentNodeHandleKey], newSet)) {
|
||||
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
nextNodeHandles.push({ node: currentNode, handle: currentHandle })
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeParallelInfoMap[currentNode.id].depth > parallelListItem.depth)
|
||||
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
|
||||
outgoers.forEach((outgoer) => {
|
||||
const outgoerConnectedEdges = getConnectedEdges([outgoer], edges).filter(edge => edge.source === outgoer.id)
|
||||
const sourceEdgesGroup = groupBy(outgoerConnectedEdges, 'sourceHandle')
|
||||
const incomers = getIncomers(outgoer, nodes, edges)
|
||||
|
||||
if (outgoers.length > 1 && incomers.length > 1)
|
||||
hasAbnormalEdges = true
|
||||
|
||||
Object.keys(sourceEdgesGroup).forEach((sourceHandle) => {
|
||||
nextHandles.push({ node: outgoer, handle: sourceHandle })
|
||||
})
|
||||
if (!outgoerConnectedEdges.length)
|
||||
nextHandles.push({ node: outgoer, handle: 'source' })
|
||||
|
||||
const outgoerKey = outgoer.id
|
||||
if (!nodeEdgesSet[outgoerKey])
|
||||
nodeEdgesSet[outgoerKey] = new Set<string>()
|
||||
|
||||
if (nodeEdgesSet[currentNodeHandleKey]) {
|
||||
for (const item of nodeEdgesSet[currentNodeHandleKey])
|
||||
nodeEdgesSet[outgoerKey].add(item)
|
||||
}
|
||||
|
||||
if (!streamInfo[outgoerKey]) {
|
||||
streamInfo[outgoerKey] = {
|
||||
upstreamNodes: new Set<string>(),
|
||||
downstreamEdges: new Set<string>(),
|
||||
}
|
||||
}
|
||||
|
||||
if (!nodeParallelInfoMap[outgoer.id]) {
|
||||
nodeParallelInfoMap[outgoer.id] = {
|
||||
...nodeParallelInfoMap[currentNode.id],
|
||||
}
|
||||
}
|
||||
|
||||
if (connectedEdgesLength > 1) {
|
||||
const edge = connectedEdges.find(edge => edge.target === outgoer.id)!
|
||||
nodeEdgesSet[outgoerKey].add(edge.id)
|
||||
totalEdgesSet.add(edge.id)
|
||||
|
||||
streamInfo[currentNodeHandleKey].downstreamEdges.add(edge.id)
|
||||
streamInfo[outgoerKey].upstreamNodes.add(currentNodeHandleKey)
|
||||
|
||||
for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
|
||||
streamInfo[item].downstreamEdges.add(edge.id)
|
||||
|
||||
if (!parallelListItem.parallelNodeId)
|
||||
parallelListItem.parallelNodeId = currentNode.id
|
||||
|
||||
const prevDepth = nodeParallelInfoMap[currentNode.id].depth + 1
|
||||
const currentDepth = nodeParallelInfoMap[outgoer.id].depth
|
||||
|
||||
nodeParallelInfoMap[outgoer.id].depth = Math.max(prevDepth, currentDepth)
|
||||
}
|
||||
else {
|
||||
for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
|
||||
streamInfo[outgoerKey].upstreamNodes.add(item)
|
||||
|
||||
nodeParallelInfoMap[outgoer.id].depth = nodeParallelInfoMap[currentNode.id].depth
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
parallelList.push(parallelListItem)
|
||||
}
|
||||
|
||||
while (nextNodeHandles.length) {
|
||||
const nodeHandle = nextNodeHandles.shift()!
|
||||
traverse(nodeHandle)
|
||||
}
|
||||
|
||||
return {
|
||||
parallelList,
|
||||
hasAbnormalEdges,
|
||||
}
|
||||
}
|
||||
|
||||
export const hasErrorHandleNode = (nodeType?: BlockEnum) => {
|
||||
return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code
|
||||
}
|
||||
|
||||
@ -183,8 +183,7 @@ export default translation
|
||||
if (fs.existsSync(toGenLanguageFilePath)) {
|
||||
const originalContent = fs.readFileSync(toGenLanguageFilePath, 'utf8')
|
||||
// Extract original template literal content for resolutionTooltip
|
||||
const regex = /(resolutionTooltip):\s*`([^`]*)`/s
|
||||
const originalMatch = regex.exec(originalContent)
|
||||
const originalMatch = originalContent.match(/(resolutionTooltip):\s*`([^`]*)`/s)
|
||||
if (originalMatch) {
|
||||
const [fullMatch, key, value] = originalMatch
|
||||
res = res.replace(
|
||||
|
||||
@ -10,8 +10,7 @@ function getNamespacesFromConfig() {
|
||||
const configContent = fs.readFileSync(configPath, 'utf8')
|
||||
|
||||
// Extract NAMESPACES array using regex
|
||||
const namespacesRegex = /const NAMESPACES = \[([\s\S]*?)\]/
|
||||
const namespacesMatch = namespacesRegex.exec(configContent)
|
||||
const namespacesMatch = configContent.match(/const NAMESPACES = \[([\s\S]*?)\]/)
|
||||
if (!namespacesMatch) {
|
||||
throw new Error('Could not find NAMESPACES array in i18next-config.ts')
|
||||
}
|
||||
@ -37,8 +36,7 @@ function getNamespacesFromTypes() {
|
||||
const typesContent = fs.readFileSync(typesPath, 'utf8')
|
||||
|
||||
// Extract namespaces from Messages type
|
||||
const messagesRegex = /export type Messages = \{([\s\S]*?)\}/
|
||||
const messagesMatch = messagesRegex.exec(typesContent)
|
||||
const messagesMatch = typesContent.match(/export type Messages = \{([\s\S]*?)\}/)
|
||||
if (!messagesMatch) {
|
||||
return null
|
||||
}
|
||||
|
||||
@ -157,8 +157,7 @@ async function removeExtraKeysFromFile(language, fileName, extraKeys) {
|
||||
const trimmedLine = line.trim()
|
||||
|
||||
// Track current object path
|
||||
const keyRegex = /^(\w+)\s*:\s*{/
|
||||
const keyMatch = keyRegex.exec(trimmedLine)
|
||||
const keyMatch = trimmedLine.match(/^(\w+)\s*:\s*{/)
|
||||
if (keyMatch) {
|
||||
currentPath.push(keyMatch[1])
|
||||
braceDepth++
|
||||
@ -171,8 +170,7 @@ async function removeExtraKeysFromFile(language, fileName, extraKeys) {
|
||||
}
|
||||
|
||||
// Check if this line matches our target key
|
||||
const leafKeyRegex = /^(\w+)\s*:/
|
||||
const leafKeyMatch = leafKeyRegex.exec(trimmedLine)
|
||||
const leafKeyMatch = trimmedLine.match(/^(\w+)\s*:/)
|
||||
if (leafKeyMatch) {
|
||||
const fullPath = [...currentPath, leafKeyMatch[1]]
|
||||
const fullPathString = fullPath.join('.')
|
||||
@ -193,8 +191,7 @@ async function removeExtraKeysFromFile(language, fileName, extraKeys) {
|
||||
const trimmedKeyLine = keyLine.trim()
|
||||
|
||||
// If key line ends with ":" (not ":", "{ " or complete value), it's likely multiline
|
||||
const valueRegex = /:\s*['"`]/
|
||||
if (trimmedKeyLine.endsWith(':') && !trimmedKeyLine.includes('{') && !valueRegex.test(trimmedKeyLine)) {
|
||||
if (trimmedKeyLine.endsWith(':') && !trimmedKeyLine.includes('{') && !trimmedKeyLine.match(/:\s*['"`]/)) {
|
||||
// Find the value lines that belong to this key
|
||||
let currentLine = targetLineIndex + 1
|
||||
let foundValue = false
|
||||
@ -210,8 +207,7 @@ async function removeExtraKeysFromFile(language, fileName, extraKeys) {
|
||||
}
|
||||
|
||||
// Check if this line starts a new key (indicates end of current value)
|
||||
const keyStartRegex = /^\w+\s*:/
|
||||
if (keyStartRegex.test(trimmed))
|
||||
if (trimmed.match(/^\w+\s*:/))
|
||||
break
|
||||
|
||||
// Check if this line is part of the value
|
||||
|
||||
@ -10,8 +10,7 @@ function getNamespacesFromConfig() {
|
||||
const configContent = fs.readFileSync(configPath, 'utf8')
|
||||
|
||||
// Extract NAMESPACES array using regex
|
||||
const namespacesRegex = /const NAMESPACES = \[([\s\S]*?)\]/
|
||||
const namespacesMatch = namespacesRegex.exec(configContent)
|
||||
const namespacesMatch = configContent.match(/const NAMESPACES = \[([\s\S]*?)\]/)
|
||||
if (!namespacesMatch) {
|
||||
throw new Error('Could not find NAMESPACES array in i18next-config.ts')
|
||||
}
|
||||
|
||||
@ -4,12 +4,12 @@ const translation = {
|
||||
title: '空白の知識パイプライン',
|
||||
description: 'データ処理と構造を完全に制御できるカスタムパイプラインをゼロから作成します。',
|
||||
},
|
||||
backToKnowledge: 'ナレッジベースに戻る',
|
||||
backToKnowledge: '知識に戻る',
|
||||
caution: '注意',
|
||||
importDSL: 'DSLファイルからインポートする',
|
||||
errorTip: 'ナレッジベースの作成に失敗しました',
|
||||
createKnowledge: 'ナレッジベースを作成する',
|
||||
successTip: 'ナレッジベースが正常に作成されました',
|
||||
createKnowledge: '知識を創造する',
|
||||
successTip: '知識ベースが正常に作成されました',
|
||||
},
|
||||
templates: {
|
||||
customized: 'カスタマイズされた',
|
||||
@ -21,10 +21,10 @@ const translation = {
|
||||
preview: 'プレビュー',
|
||||
dataSource: 'データソース',
|
||||
editInfo: '情報を編集する',
|
||||
exportPipeline: 'パイプラインをエクスポートする',
|
||||
exportPipeline: '輸出パイプライン',
|
||||
saveAndProcess: '保存して処理する',
|
||||
backToDataSource: 'データソースに戻る',
|
||||
useTemplate: 'このナレッジパイプラインを使用してください',
|
||||
useTemplate: 'この知識パイプラインを使用してください',
|
||||
process: 'プロセス',
|
||||
},
|
||||
deletePipeline: {
|
||||
@ -37,7 +37,7 @@ const translation = {
|
||||
tip: '<CustomLink>ドキュメントに移動</CustomLink>して、ドキュメントを追加または管理してください。',
|
||||
},
|
||||
error: {
|
||||
message: 'ナレッジパイプラインの公開に失敗しました',
|
||||
message: '知識パイプラインの公開に失敗しました',
|
||||
},
|
||||
},
|
||||
publishTemplate: {
|
||||
@ -147,19 +147,19 @@ const translation = {
|
||||
content: 'この操作は永久的です。以前の方法に戻すことはできません。変換することを確認してください。',
|
||||
},
|
||||
warning: 'この操作は元に戻せません。',
|
||||
title: 'ナレッジパイプラインに変換する',
|
||||
title: '知識パイプラインに変換する',
|
||||
successMessage: 'データセットをパイプラインに正常に変換しました',
|
||||
errorMessage: 'データセットをパイプラインに変換できませんでした',
|
||||
descriptionChunk1: '既存のナレッジベースを文書処理のためにナレッジパイプラインを使用するように変換できます。',
|
||||
descriptionChunk1: '既存の知識ベースを文書処理のためにナレッジパイプラインを使用するように変換できます。',
|
||||
descriptionChunk2: '— よりオープンで柔軟なアプローチを採用し、私たちのマーケットプレイスからのプラグインへのアクセスを提供します。これにより、すべての将来のドキュメントに新しい処理方法が適用されることになります。',
|
||||
},
|
||||
knowledgeNameAndIcon: 'ナレッジの名前とアイコン',
|
||||
knowledgeNameAndIcon: '知識の名前とアイコン',
|
||||
inputField: '入力フィールド',
|
||||
pipelineNameAndIcon: 'パイプライン名とアイコン',
|
||||
knowledgePermissions: '権限',
|
||||
knowledgeNameAndIconPlaceholder: 'ナレッジベースの名前を入力してください',
|
||||
editPipelineInfo: 'パイプライン情報を編集する',
|
||||
knowledgeDescription: 'ナレッジベースの説明',
|
||||
knowledgeDescription: '知識の説明',
|
||||
knowledgeDescriptionPlaceholder: 'このナレッジベースに何が含まれているかを説明してください。詳細な説明は、AIがデータセットの内容により正確にアクセスできるようにします。空の場合、Difyはデフォルトのヒット戦略を使用します。(オプション)',
|
||||
}
|
||||
|
||||
|
||||
@ -91,7 +91,6 @@ const remoteImageURLs = [hasSetWebPrefix ? new URL(`${process.env.NEXT_PUBLIC_WE
|
||||
/** @type {import('next').NextConfig} */
|
||||
const nextConfig = {
|
||||
basePath: process.env.NEXT_PUBLIC_BASE_PATH || '',
|
||||
transpilePackages: ['echarts', 'zrender'],
|
||||
turbopack: {
|
||||
rules: codeInspectorPlugin({
|
||||
bundler: 'turbopack'
|
||||
|
||||
@ -102,17 +102,11 @@ export const getVars = (value: string) => {
|
||||
if (!value)
|
||||
return []
|
||||
|
||||
const matches: string[] = []
|
||||
let match
|
||||
varRegex.lastIndex = 0
|
||||
while ((match = varRegex.exec(value)) !== null)
|
||||
matches.push(match[0])
|
||||
|
||||
const keys = matches.filter((item) => {
|
||||
const keys = value.match(varRegex)?.filter((item) => {
|
||||
return ![CONTEXT_PLACEHOLDER_TEXT, HISTORY_PLACEHOLDER_TEXT, QUERY_PLACEHOLDER_TEXT, PRE_PROMPT_PLACEHOLDER_TEXT].includes(item)
|
||||
}).map((item) => {
|
||||
return item.replace('{{', '').replace('}}', '')
|
||||
}).filter(key => key.length <= MAX_VAR_KEY_LENGTH)
|
||||
}).filter(key => key.length <= MAX_VAR_KEY_LENGTH) || []
|
||||
const keyObj: Record<string, boolean> = {}
|
||||
// remove duplicate keys
|
||||
const res: string[] = []
|
||||
|
||||
Reference in New Issue
Block a user